是Flink1.12的,kafka消费端能读取到数据,但是下面的代码无法读取到数据,运行后没有报错也没有输出,
求助,谢谢
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.math.Ordering.Int
object FlinkKafkaDDLDemo
{
def main(args: Array[String]): Unit =
{
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
val createTable =
"""
|CREATE TABLE PERSON (
| name VARCHAR COMMENT '姓名',
| age VARCHAR COMMENT '年龄',
| city VARCHAR COMMENT '所在城市',
| address VARCHAR COMMENT '家庭住址',
| ts TIMESTAMP(3) COMMENT '时间戳'
|)
|WITH (
| 'connector.type' = 'kafka', -- 使用 kafka connector
| 'connector.version' = 'universal', -- kafka 版本
| 'connector.topic' = 'kafka_ddl', -- kafka topic
| 'connector.startup-mode' = 'earliest-offset', -- 从最早的 offset 开始读取
| 'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息
| 'connector.properties.0.value' = 'Desktop:2181',
| 'connector.properties.1.key' = 'bootstrap.servers',
| 'connector.properties.1.value' = 'Desktop:9091',
| 'update-mode' = 'append',
| 'format.type' = 'json', -- 数据源格式为 json
| 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
|)
""".stripMargin
tEnv.executeSql(createTable)
val query: String ="""SELECT name,COUNT(age) FROM PERSON GROUP BY name""".stripMargin
val result: Table = tEnv.sqlQuery(query)
tEnv.toRetractStream[Row](result).print()
// tEnv.execute("Flink SQL DDL")
}
}