I am using Flink Table API recently. I want to use EventTime and use a Kakfa Table Connector. I think in my code Flink cannot recognize event time timestamp field. Here is my code :
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
tEnv
// 使用connect函数连接外部系统
.connect(
new Kafka()
.version("universal") // 必填,合法的参数有"0.8", "0.9", "0.10", "0.11"或"universal"
.topic("user_behavior") // 必填,Topic名
.startFromLatest() // 首次消费时数据读取的位置
.property("zookeeper.connect", "localhost:2181") // Kafka连接参数
.property("bootstrap.servers", "localhost:9092")
)
// 序列化方式 可以是JSON、Avro等
.withFormat(new Json())
// 数据的Schema
.withSchema(
new Schema()
.field("user_id", DataTypes.BIGINT())
.field("item_id", DataTypes.BIGINT())
.field("category_id", DataTypes.BIGINT())
.field("behavior", DataTypes.STRING())
.field("ts", DataTypes.TIMESTAMP(3))
.rowtime(new Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
)
// 临时表的表名,后续可以在SQL语句中使用这个表名
.createTemporaryTable("user_behavior");
Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
"\tuser_id, \n" +
"\tCOUNT(behavior) AS behavior_cnt, \n" +
"\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
"FROM user_behavior\n" +
"GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
result.print();
env.execute("table api");
}
As shown in the code above, I use rowtime() method when I want to define a Schema. When I try to run, I get the following error: Window
aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
" user_id BIGINT,\n" +
" item_id BIGINT,\n" +
" category_id BIGINT,\n" +
" behavior STRING,\n" +
" ts TIMESTAMP(3),\n" +
// " proctime as PROCTIME(), -- 通过计算列产生一个处理时间列\n" +
" WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列\n" +
") WITH (\n" +
" 'connector.type' = 'kafka', -- 使用 kafka connector\n" +
" 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本\n" +
" 'connector.topic' = 'user_behavior', -- kafka topic\n" +
" 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址\n" +
" 'format.type' = 'json' -- 数据源格式为 json\n" +
")");
Hope anyone can give some suggestions. Thanks.