Table API connect method timestamp watermark assignment problem

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Table API connect method timestamp watermark assignment problem

Lu Weizheng
Hey guys,

I am using Flink Table API recently. I want to use EventTime and use a Kakfa Table Connector. I think in my code Flink cannot recognize event time timestamp field. Here is my code :

public static void main(String[] args) throws Exception {

        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        tEnv
            // 使用connect函数连接外部系统
            .connect(
                new Kafka()
                .version("universal")     // 必填,合法的参数有"0.8", "0.9", "0.10", "0.11"或"universal"
                .topic("user_behavior")   // 必填,Topic名
                .startFromLatest()        // 首次消费时数据读取的位置
                .property("zookeeper.connect", "localhost:2181")  // Kafka连接参数
                .property("bootstrap.servers", "localhost:9092")
            )
            // 序列化方式 可以是JSON、Avro等
            .withFormat(new Json())
            // 数据的Schema
            .withSchema(
                new Schema()
                    .field("user_id", DataTypes.BIGINT())
                    .field("item_id", DataTypes.BIGINT())
                    .field("category_id", DataTypes.BIGINT())
                    .field("behavior", DataTypes.STRING())
                    .field("ts", DataTypes.TIMESTAMP(3))
                    .rowtime(new Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
            )
            // 临时表的表名,后续可以在SQL语句中使用这个表名
            .createTemporaryTable("user_behavior");

        Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
                "\tuser_id, \n" +
                "\tCOUNT(behavior) AS behavior_cnt, \n" +
                "\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
                "FROM user_behavior\n" +
                "GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
        result.print();

        env.execute("table api");
    }

As shown in the code above, I use rowtime() method when I want to define a Schema. When I try to run, I get the following error: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

​I tried another method based on a DLL, and it worked. So it is not my Kafka source problem.

tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
                "    user_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    category_id BIGINT,\n" +
                "    behavior STRING,\n" +
                "    ts TIMESTAMP(3),\n" +
//                "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
                "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
                "    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本\n" +
                "    'connector.topic' = 'user_behavior',  -- kafka topic\n" +
                "    'connector.startup-mode' = 'latest-offset',  -- 从起始 offset 开始读取\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址\n" +
                "    'format.type' = 'json'  -- 数据源格式为 json\n" +
                ")");

Hope anyone can give some suggestions. Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Table API connect method timestamp watermark assignment problem

Jark Wu-3
Hi Lu,

DDL and Schema descriptor do not share the same code path. I guess the reason why Schema descriptor doesn't work is because of FLINK-16160.
We will fix that in the next minor release. Please use DDL to define watermark which is also the suggested way to do that.
The current Schema descriptor will be refactored to share the same code path of DDL in the near future. 

Best,
Jark


On Tue, 3 Mar 2020 at 10:09, Lu Weizheng <[hidden email]> wrote:
Hey guys,

I am using Flink Table API recently. I want to use EventTime and use a Kakfa Table Connector. I think in my code Flink cannot recognize event time timestamp field. Here is my code :

public static void main(String[] args) throws Exception {

        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        tEnv
            // 使用connect函数连接外部系统
            .connect(
                new Kafka()
                .version("universal")     // 必填,合法的参数有"0.8", "0.9", "0.10", "0.11"或"universal"
                .topic("user_behavior")   // 必填,Topic名
                .startFromLatest()        // 首次消费时数据读取的位置
                .property("zookeeper.connect", "localhost:2181")  // Kafka连接参数
                .property("bootstrap.servers", "localhost:9092")
            )
            // 序列化方式 可以是JSON、Avro等
            .withFormat(new Json())
            // 数据的Schema
            .withSchema(
                new Schema()
                    .field("user_id", DataTypes.BIGINT())
                    .field("item_id", DataTypes.BIGINT())
                    .field("category_id", DataTypes.BIGINT())
                    .field("behavior", DataTypes.STRING())
                    .field("ts", DataTypes.TIMESTAMP(3))
                    .rowtime(new Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
            )
            // 临时表的表名,后续可以在SQL语句中使用这个表名
            .createTemporaryTable("user_behavior");

        Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
                "\tuser_id, \n" +
                "\tCOUNT(behavior) AS behavior_cnt, \n" +
                "\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
                "FROM user_behavior\n" +
                "GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
        result.print();

        env.execute("table api");
    }

As shown in the code above, I use rowtime() method when I want to define a Schema. When I try to run, I get the following error: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

​I tried another method based on a DLL, and it worked. So it is not my Kafka source problem.

tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
                "    user_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    category_id BIGINT,\n" +
                "    behavior STRING,\n" +
                "    ts TIMESTAMP(3),\n" +
//                "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
                "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
                "    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本\n" +
                "    'connector.topic' = 'user_behavior',  -- kafka topic\n" +
                "    'connector.startup-mode' = 'latest-offset',  -- 从起始 offset 开始读取\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址\n" +
                "    'format.type' = 'json'  -- 数据源格式为 json\n" +
                ")");

Hope anyone can give some suggestions. Thanks.
Reply | Threaded
Open this post in threaded view
|

回复: Table API connect method timestamp watermark assignment problem

Lu Weizheng
Thanks a lot, hope it will be fixed soon!

发件人: Jark Wu <[hidden email]>
发送时间: 2020年3月3日 11:25
收件人: Lu Weizheng <[hidden email]>
抄送: [hidden email] <[hidden email]>
主题: Re: Table API connect method timestamp watermark assignment problem
 
Hi Lu,

DDL and Schema descriptor do not share the same code path. I guess the reason why Schema descriptor doesn't work is because of FLINK-16160.
We will fix that in the next minor release. Please use DDL to define watermark which is also the suggested way to do that.
The current Schema descriptor will be refactored to share the same code path of DDL in the near future. 

Best,
Jark


On Tue, 3 Mar 2020 at 10:09, Lu Weizheng <[hidden email]> wrote:
Hey guys,

I am using Flink Table API recently. I want to use EventTime and use a Kakfa Table Connector. I think in my code Flink cannot recognize event time timestamp field. Here is my code :

public static void main(String[] args) throws Exception {

        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        tEnv
            // 使用connect函数连接外部系统
            .connect(
                new Kafka()
                .version("universal")     // 必填,合法的参数有"0.8", "0.9", "0.10", "0.11"或"universal"
                .topic("user_behavior")   // 必填,Topic名
                .startFromLatest()        // 首次消费时数据读取的位置
                .property("zookeeper.connect", "localhost:2181")  // Kafka连接参数
                .property("bootstrap.servers", "localhost:9092")
            )
            // 序列化方式 可以是JSON、Avro等
            .withFormat(new Json())
            // 数据的Schema
            .withSchema(
                new Schema()
                    .field("user_id", DataTypes.BIGINT())
                    .field("item_id", DataTypes.BIGINT())
                    .field("category_id", DataTypes.BIGINT())
                    .field("behavior", DataTypes.STRING())
                    .field("ts", DataTypes.TIMESTAMP(3))
                    .rowtime(new Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
            )
            // 临时表的表名,后续可以在SQL语句中使用这个表名
            .createTemporaryTable("user_behavior");

        Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
                "\tuser_id, \n" +
                "\tCOUNT(behavior) AS behavior_cnt, \n" +
                "\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
                "FROM user_behavior\n" +
                "GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
        result.print();

        env.execute("table api");
    }

As shown in the code above, I use rowtime() method when I want to define a Schema. When I try to run, I get the following error: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

​I tried another method based on a DLL, and it worked. So it is not my Kafka source problem.

tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
                "    user_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    category_id BIGINT,\n" +
                "    behavior STRING,\n" +
                "    ts TIMESTAMP(3),\n" +
//                "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
                "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
                "    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本\n" +
                "    'connector.topic' = 'user_behavior',  -- kafka topic\n" +
                "    'connector.startup-mode' = 'latest-offset',  -- 从起始 offset 开始读取\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址\n" +
                "    'format.type' = 'json'  -- 数据源格式为 json\n" +
                ")");

Hope anyone can give some suggestions. Thanks.