Flink+DDL读取kafka没有输出信息,但是kafka消费端有信息

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink+DDL读取kafka没有输出信息,但是kafka消费端有信息

Appleyuchi
是Flink1.12的,kafka消费端能读取到数据,但是下面的代码无法读取到数据,运行后没有报错也没有输出,
求助,谢谢


import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.math.Ordering.Int



object FlinkKafkaDDLDemo
{

def main(args: Array[String]): Unit =
{

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(3)



val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

val createTable =
"""
|CREATE TABLE PERSON (

| name VARCHAR COMMENT '姓名',

| age VARCHAR COMMENT '年龄',

| city VARCHAR COMMENT '所在城市',

| address VARCHAR COMMENT '家庭住址',

| ts TIMESTAMP(3) COMMENT '时间戳'

|)

|WITH (

| 'connector.type' = 'kafka', -- 使用 kafka connector

| 'connector.version' = 'universal', -- kafka 版本

| 'connector.topic' = 'kafka_ddl', -- kafka topic

| 'connector.startup-mode' = 'earliest-offset', -- 从最早的 offset 开始读取

| 'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息

| 'connector.properties.0.value' = 'Desktop:2181',

| 'connector.properties.1.key' = 'bootstrap.servers',

| 'connector.properties.1.value' = 'Desktop:9091',

| 'update-mode' = 'append',

| 'format.type' = 'json', -- 数据源格式为 json

| 'format.derive-schema' = 'true' -- DDL schema 确定 json 解析规则

|)

""".stripMargin



tEnv.executeSql(createTable)



val query: String ="""SELECT name,COUNT(age) FROM PERSON GROUP BY name""".stripMargin



val result: Table = tEnv.sqlQuery(query)

tEnv.toRetractStream[Row](result).print()
// tEnv.execute("Flink SQL DDL")

}

}


 

Reply | Threaded
Open this post in threaded view
|

Re: Flink+DDL读取kafka没有输出信息,但是kafka消费端有信息

Arvid Heise-3
Note that you posted to the english speaking mailing list. For the Chinese-speaking version please use [hidden email].

On Thu, Dec 24, 2020 at 3:39 PM Appleyuchi <[hidden email]> wrote:
是Flink1.12的,kafka消费端能读取到数据,但是下面的代码无法读取到数据,运行后没有报错也没有输出,
求助,谢谢


import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.math.Ordering.Int



object FlinkKafkaDDLDemo
{

def main(args: Array[String]): Unit =
{

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(3)



val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

val createTable =
"""
|CREATE TABLE PERSON (

| name VARCHAR COMMENT '姓名',

| age VARCHAR COMMENT '年龄',

| city VARCHAR COMMENT '所在城市',

| address VARCHAR COMMENT '家庭住址',

| ts TIMESTAMP(3) COMMENT '时间戳'

|)

|WITH (

| 'connector.type' = 'kafka', -- 使用 kafka connector

| 'connector.version' = 'universal', -- kafka 版本

| 'connector.topic' = 'kafka_ddl', -- kafka topic

| 'connector.startup-mode' = 'earliest-offset', -- 从最早的 offset 开始读取

| 'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息

| 'connector.properties.0.value' = 'Desktop:2181',

| 'connector.properties.1.key' = 'bootstrap.servers',

| 'connector.properties.1.value' = 'Desktop:9091',

| 'update-mode' = 'append',

| 'format.type' = 'json', -- 数据源格式为 json

| 'format.derive-schema' = 'true' -- DDL schema 确定 json 解析规则

|)

""".stripMargin



tEnv.executeSql(createTable)



val query: String ="""SELECT name,COUNT(age) FROM PERSON GROUP BY name""".stripMargin



val result: Table = tEnv.sqlQuery(query)

tEnv.toRetractStream[Row](result).print()
// tEnv.execute("Flink SQL DDL")

}

}


 



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng