I'm trying to write java unit test for a Flink SQL application using Flink mini cluster, but I do not manage to create an input table with nested fields and time characteristics. I had a look at the documentation and examples below, although I'm still struggling: Consider for example this simple expression that I want to test and which depends on the nested field "created.group_id" and expects "metricValue" to be the row time: var createTableDDl = "" + " CREATE TEMPORARY VIEW postCreated10min \n" + " AS \n" + " SELECT \n" + " created.group_id as groupId, \n" + " TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime, \n" + " TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime, \n" + " count(1) as metricValue \n" + " FROM post_events_kafka \n" + " GROUP BY \n" + " created.group_id, \n" + " TUMBLE(event_time, INTERVAL '10' MINUTES) \n"; tableEnv.executeSql(createTableDDl); In a unit test, the following syntax allows me to create test input data with nested fields, but I have not found how to specify row time nor watermarks with this approach: Table testTable = tableEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("created", DataTypes.ROW( DataTypes.FIELD("group_id", DataTypes.STRING()) ) ), DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)) ), row(row("group123"), "2021-02-03 11:36:20"), row(row("group123"), "2021-02-03 11:38:20"), row(row("group123"), "2021-02-03 11:40:20") ); tableEnv.createTemporaryView("post_events_kafka", testTable); I have also tried the following syntax, which allows to specify watermark and row time, but I have not found how to create a nested field with this approach: var testData = List.of( Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")), sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")), Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20")) ); var testStream = streamEnv .fromCollection(testData) .assignTimestampsAndWatermarks(WatermarkStrategy .<Tuple2<String, Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10)) .withTimestampAssigner( TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime())) ); var testDataTable = tableEnv.fromDataStream( testStream, $("group_id"), $("true_as_of"), $("event_time").rowtime() ); tableEnv.createTemporaryView("post_events_kafka", testDataTable); What am I missing?
|
I found an answer to my own question! For future reference, the snipet below allows to create a SQL table with a nested field and a watermark and filled with hard-coded values, which is all I need in order to test SQL expressions. It's quite a mouthful though, is there a more succint to express the same thing?
On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
|
Hi,
there are multiple ways to create a table for testing: - use the datagen connector - use the filesystem connector with CSV data - and beginning from Flink 1.13 your code snippets becomes much simpler Regards, Timo On 29.04.21 20:35, Svend wrote: > I found an answer to my own question! > > For future reference, the snipet below allows to create a SQL table with > a nested field and a watermark and filled with hard-coded values, which > is all I need in order to test SQL expressions. > > It's quite a mouthful though, is there a more succint to express the > same thing? > > > var testData = List./of/( > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:36:20")), > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:38:20")), > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:40:20")) > ); > var testStream = streamEnv > .fromCollection(testData, > Types./ROW_NAMED/(new String[] {"created", "event_time"}, > Types./ROW_NAMED/(new String[] {"fandom_id"}, Types./STRING/), > Types./SQL_TIMESTAMP > /) > ) > .assignTimestampsAndWatermarks(WatermarkStrategy > .<Row>/forBoundedOutOfOrderness/(Duration./ofMinutes/(10)) > .withTimestampAssigner( > TimestampAssignerSupplier./of/((t2, t) -> ((Timestamp) > (t2.getField(1))).getTime())) > ); > var testDataTable = tableEnv.fromDataStream(testStream, /$/("created"), > /$/("event_time").rowtime()); > tableEnv.createTemporaryView("post_events_kafka", testDataTable); > > > > > > On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote: >> I'm trying to write java unit test for a Flink SQL application using >> Flink mini cluster, but I do not manage to create an input table with >> nested fields and time characteristics. >> >> I had a look at the documentation and examples below, although I'm >> still struggling: >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html> >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java >> <https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java> >> >> >> Consider for example this simple expression that I want to test and >> which depends on the nested field "created.group_id" and expects >> "metricValue" to be the row time: >> >> >> var createTableDDl = "" >> + " CREATE TEMPORARY VIEW >> postCreated10min \n" >> + " >> AS >> \n" >> + " >> SELECT >> \n" >> + " created.group_id as >> groupId, \n" >> + " TUMBLE_END(event_time, INTERVAL '10' MINUTES) as >> metricTime, \n" >> + " TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as >> rowTime, \n" >> + " count(1) as >> metricValue \n" >> + " FROM >> post_events_kafka \n" >> + " GROUP >> BY \n" >> + " >> created.group_id, >> \n" >> + " TUMBLE(event_time, INTERVAL '10' >> MINUTES) \n"; >> tableEnv.executeSql(createTableDDl); >> >> >> In a unit test, the following syntax allows me to create test input >> data with nested fields, but I have not found how to specify row time >> nor watermarks with this approach: >> >> >> Table testTable = tableEnv.fromValues( >> DataTypes.ROW( >> DataTypes.FIELD("created", >> DataTypes.ROW( >> DataTypes.FIELD("group_id", DataTypes.STRING()) >> ) >> ), >> DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)) >> ), >> >> row(row("group123"), "2021-02-03 11:36:20"), >> row(row("group123"), "2021-02-03 11:38:20"), >> row(row("group123"), "2021-02-03 11:40:20") >> ); >> tableEnv.createTemporaryView("post_events_kafka", testTable); >> >> >> I have also tried the following syntax, which allows to specify >> watermark and row time, but I have not found how to create a nested >> field with this approach: >> >> >> var testData = List.of( >> Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")), >> sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")), >> Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20")) >> ); >> var testStream = streamEnv >> .fromCollection(testData) >> .assignTimestampsAndWatermarks(WatermarkStrategy >> .<Tuple2<String, >> Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10)) >> .withTimestampAssigner( >> TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime())) >> ); >> var testDataTable = tableEnv.fromDataStream( >> testStream, >> $("group_id"), $("true_as_of"), $("event_time").rowtime() >> ); >> tableEnv.createTemporaryView("post_events_kafka", testDataTable); >> >> >> >> What am I missing? > |
Thanks for the feedback. The CSV is a good idea and will make my tests more readable, I'll use that. Looking forward to Flink 1.13 ! Svend On Fri, 30 Apr 2021, at 9:09 AM, Timo Walther wrote:
|
Free forum by Nabble | Edit this page |