How to create a java unit test for Flink SQL with nested field and watermark?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to create a java unit test for Flink SQL with nested field and watermark?

Svend
I'm trying to write java unit test for a Flink SQL application using Flink mini cluster, but I do not manage to create an input table with nested fields and time characteristics.

I had a look at the documentation and examples below, although I'm still struggling:


Consider for example this simple expression that I want to test and which depends on the nested field "created.group_id" and expects "metricValue" to be the row time:


var createTableDDl = ""
        + " CREATE TEMPORARY VIEW postCreated10min                                      \n"
        + " AS                                                                          \n"
        + " SELECT                                                                      \n"
        + "   created.group_id as groupId,                                              \n"
        + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime,              \n"
        + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime,             \n"
        + "   count(1) as metricValue                                                   \n"
        + " FROM post_events_kafka                                                      \n"
        + " GROUP BY                                                                    \n"
        + "   created.group_id,                                                         \n"
        + "   TUMBLE(event_time, INTERVAL '10' MINUTES)                                 \n";
tableEnv.executeSql(createTableDDl);


In a unit test, the following syntax allows me to create test input data with nested fields, but I have not found how to specify row time nor watermarks with this approach:


Table testTable = tableEnv.fromValues(
  DataTypes.ROW(
    DataTypes.FIELD("created",
      DataTypes.ROW(
        DataTypes.FIELD("group_id", DataTypes.STRING())
      )
    ),
    DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
  ),
 
  row(row("group123"), "2021-02-03 11:36:20"),
  row(row("group123"), "2021-02-03 11:38:20"),
  row(row("group123"), "2021-02-03 11:40:20")
);
tableEnv.createTemporaryView("post_events_kafka", testTable);


I have also tried the following syntax, which allows to specify watermark and row time, but I have not found how to create a nested field with this approach:


var testData = List.of(
  Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
  sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
  Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
);
var testStream = streamEnv
  .fromCollection(testData)
  .assignTimestampsAndWatermarks(WatermarkStrategy
  .<Tuple2<String, Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
  .withTimestampAssigner(
    TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
  );
var testDataTable = tableEnv.fromDataStream(
  testStream,
  $("group_id"), $("true_as_of"), $("event_time").rowtime()
);
tableEnv.createTemporaryView("post_events_kafka", testDataTable);



What am I missing?
Reply | Threaded
Open this post in threaded view
|

Re: How to create a java unit test for Flink SQL with nested field and watermark?

Svend
I found an answer to my own question!

For future reference, the snipet below allows to create a SQL table with a nested field and a watermark and filled with hard-coded values, which is all I need in order to test SQL expressions.

It's quite a mouthful though, is there a more succint to express the same thing?


var testData = List.of(
Row.of(Row.of("group123"), Timestamp.valueOf("2021-02-03 11:36:20")),
Row.of(Row.of("group123"), Timestamp.valueOf("2021-02-03 11:38:20")),
Row.of(Row.of("group123"), Timestamp.valueOf("2021-02-03 11:40:20"))
);
var testStream = streamEnv
.fromCollection(testData,
Types.ROW_NAMED(new String[] {"created", "event_time"},
Types.ROW_NAMED(new String[] {"fandom_id"}, Types.STRING),
Types.SQL_TIMESTAMP
)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Row>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner(
TimestampAssignerSupplier.of((t2, t) -> ((Timestamp) (t2.getField(1))).getTime()))
);
var testDataTable = tableEnv.fromDataStream(testStream, $("created"), $("event_time").rowtime());
tableEnv.createTemporaryView("post_events_kafka", testDataTable);




On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
I'm trying to write java unit test for a Flink SQL application using Flink mini cluster, but I do not manage to create an input table with nested fields and time characteristics.

I had a look at the documentation and examples below, although I'm still struggling:


Consider for example this simple expression that I want to test and which depends on the nested field "created.group_id" and expects "metricValue" to be the row time:


var createTableDDl = ""
        + " CREATE TEMPORARY VIEW postCreated10min                                      \n"
        + " AS                                                                          \n"
        + " SELECT                                                                      \n"
        + "   created.group_id as groupId,                                              \n"
        + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime,              \n"
        + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime,             \n"
        + "   count(1) as metricValue                                                   \n"
        + " FROM post_events_kafka                                                      \n"
        + " GROUP BY                                                                    \n"
        + "   created.group_id,                                                         \n"
        + "   TUMBLE(event_time, INTERVAL '10' MINUTES)                                 \n";
tableEnv.executeSql(createTableDDl);


In a unit test, the following syntax allows me to create test input data with nested fields, but I have not found how to specify row time nor watermarks with this approach:


Table testTable = tableEnv.fromValues(
  DataTypes.ROW(
    DataTypes.FIELD("created",
      DataTypes.ROW(
        DataTypes.FIELD("group_id", DataTypes.STRING())
      )
    ),
    DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
  ),
 
  row(row("group123"), "2021-02-03 11:36:20"),
  row(row("group123"), "2021-02-03 11:38:20"),
  row(row("group123"), "2021-02-03 11:40:20")
);
tableEnv.createTemporaryView("post_events_kafka", testTable);


I have also tried the following syntax, which allows to specify watermark and row time, but I have not found how to create a nested field with this approach:


var testData = List.of(
  Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
  sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
  Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
);
var testStream = streamEnv
  .fromCollection(testData)
  .assignTimestampsAndWatermarks(WatermarkStrategy
  .<Tuple2<String, Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
  .withTimestampAssigner(
    TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
  );
var testDataTable = tableEnv.fromDataStream(
  testStream,
  $("group_id"), $("true_as_of"), $("event_time").rowtime()
);
tableEnv.createTemporaryView("post_events_kafka", testDataTable);



What am I missing?

Reply | Threaded
Open this post in threaded view
|

Re: How to create a java unit test for Flink SQL with nested field and watermark?

Timo Walther
Hi,

there are multiple ways to create a table for testing:

- use the datagen connector
- use the filesystem connector with CSV data
- and beginning from Flink 1.13 your code snippets becomes much simpler

Regards,
Timo

On 29.04.21 20:35, Svend wrote:

> I found an answer to my own question!
>
> For future reference, the snipet below allows to create a SQL table with
> a nested field and a watermark and filled with hard-coded values, which
> is all I need in order to test SQL expressions.
>
> It's quite a mouthful though, is there a more succint to express the
> same thing?
>
>
> var testData = List./of/(
> Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:36:20")),
> Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:38:20")),
> Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:40:20"))
> );
> var testStream = streamEnv
> .fromCollection(testData,
> Types./ROW_NAMED/(new String[] {"created", "event_time"},
> Types./ROW_NAMED/(new String[] {"fandom_id"}, Types./STRING/),
> Types./SQL_TIMESTAMP
> /)
> )
> .assignTimestampsAndWatermarks(WatermarkStrategy
> .<Row>/forBoundedOutOfOrderness/(Duration./ofMinutes/(10))
> .withTimestampAssigner(
> TimestampAssignerSupplier./of/((t2, t) -> ((Timestamp)
> (t2.getField(1))).getTime()))
> );
> var testDataTable = tableEnv.fromDataStream(testStream, /$/("created"),
> /$/("event_time").rowtime());
> tableEnv.createTemporaryView("post_events_kafka", testDataTable);
>
>
>
>
>
> On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
>> I'm trying to write java unit test for a Flink SQL application using
>> Flink mini cluster, but I do not manage to create an input table with
>> nested fields and time characteristics.
>>
>> I had a look at the documentation and examples below, although I'm
>> still struggling:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html 
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html>
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java 
>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java>
>>
>>
>> Consider for example this simple expression that I want to test and
>> which depends on the nested field "created.group_id" and expects
>> "metricValue" to be the row time:
>>
>>
>> var createTableDDl = ""
>>         + " CREATE TEMPORARY VIEW
>> postCreated10min                                      \n"
>>         + "
>> AS                                                                          
>> \n"
>>         + "
>> SELECT                                                                      
>> \n"
>>         + "   created.group_id as
>> groupId,                                              \n"
>>         + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as
>> metricTime,              \n"
>>         + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as
>> rowTime,             \n"
>>         + "   count(1) as
>> metricValue                                                   \n"
>>         + " FROM
>> post_events_kafka                                                      \n"
>>         + " GROUP
>> BY                                                                    \n"
>>         + "  
>> created.group_id,                                                        
>> \n"
>>         + "   TUMBLE(event_time, INTERVAL '10'
>> MINUTES)                                 \n";
>> tableEnv.executeSql(createTableDDl);
>>
>>
>> In a unit test, the following syntax allows me to create test input
>> data with nested fields, but I have not found how to specify row time
>> nor watermarks with this approach:
>>
>>
>> Table testTable = tableEnv.fromValues(
>>   DataTypes.ROW(
>>     DataTypes.FIELD("created",
>>       DataTypes.ROW(
>>         DataTypes.FIELD("group_id", DataTypes.STRING())
>>       )
>>     ),
>>     DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
>>   ),
>>
>>   row(row("group123"), "2021-02-03 11:36:20"),
>>   row(row("group123"), "2021-02-03 11:38:20"),
>>   row(row("group123"), "2021-02-03 11:40:20")
>> );
>> tableEnv.createTemporaryView("post_events_kafka", testTable);
>>
>>
>> I have also tried the following syntax, which allows to specify
>> watermark and row time, but I have not found how to create a nested
>> field with this approach:
>>
>>
>> var testData = List.of(
>>   Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
>>   sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
>>   Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
>> );
>> var testStream = streamEnv
>>   .fromCollection(testData)
>>   .assignTimestampsAndWatermarks(WatermarkStrategy
>>   .<Tuple2<String,
>> Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
>>   .withTimestampAssigner(
>>     TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
>>   );
>> var testDataTable = tableEnv.fromDataStream(
>>   testStream,
>>   $("group_id"), $("true_as_of"), $("event_time").rowtime()
>> );
>> tableEnv.createTemporaryView("post_events_kafka", testDataTable);
>>
>>
>>
>> What am I missing?
>

Reply | Threaded
Open this post in threaded view
|

Re: How to create a java unit test for Flink SQL with nested field and watermark?

Svend
Thanks for the feedback.

The CSV is a good idea and will make my tests more readable, I'll use that. 

Looking forward to Flink 1.13 !

Svend 




On Fri, 30 Apr 2021, at 9:09 AM, Timo Walther wrote:
Hi,

there are multiple ways to create a table for testing:

- use the datagen connector
- use the filesystem connector with CSV data
- and beginning from Flink 1.13 your code snippets becomes much simpler

Regards,
Timo

On 29.04.21 20:35, Svend wrote:
> I found an answer to my own question!

> For future reference, the snipet below allows to create a SQL table with 
> a nested field and a watermark and filled with hard-coded values, which 
> is all I need in order to test SQL expressions.

> It's quite a mouthful though, is there a more succint to express the 
> same thing?


> var testData = List./of/(
> Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:36:20")),
> Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:38:20")),
> Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:40:20"))
> );
> var testStream = streamEnv
> .fromCollection(testData,
> Types./ROW_NAMED/(new String[] {"created", "event_time"},
> Types./ROW_NAMED/(new String[] {"fandom_id"}, Types./STRING/),
> Types./SQL_TIMESTAMP
> /)
> )
> .assignTimestampsAndWatermarks(WatermarkStrategy
> .<Row>/forBoundedOutOfOrderness/(Duration./ofMinutes/(10))
> .withTimestampAssigner(
> TimestampAssignerSupplier./of/((t2, t) -> ((Timestamp) 
> (t2.getField(1))).getTime()))
> );
> var testDataTable = tableEnv.fromDataStream(testStream, /$/("created"), 
> /$/("event_time").rowtime());
> tableEnv.createTemporaryView("post_events_kafka", testDataTable);





> On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
>> I'm trying to write java unit test for a Flink SQL application using 
>> Flink mini cluster, but I do not manage to create an input table with 
>> nested fields and time characteristics.
>>
>> I had a look at the documentation and examples below, although I'm 
>> still struggling:
>>
>>
>> Consider for example this simple expression that I want to test and 
>> which depends on the nested field "created.group_id" and expects 
>> "metricValue" to be the row time:
>>
>>
>> var createTableDDl = ""
>>         + " CREATE TEMPORARY VIEW 
>> postCreated10min                                      \n"
>>         + " 
>> AS                                                                          
>> \n"
>>         + " 
>> SELECT                                                                      
>> \n"
>>         + "   created.group_id as 
>> groupId,                                              \n"
>>         + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as 
>> metricTime,              \n"
>>         + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as 
>> rowTime,             \n"
>>         + "   count(1) as 
>> metricValue                                                   \n"
>>         + " FROM 
>> post_events_kafka                                                      \n"
>>         + " GROUP 
>> BY                                                                    \n"
>>         + "   
>> created.group_id,                                                         
>> \n"
>>         + "   TUMBLE(event_time, INTERVAL '10' 
>> MINUTES)                                 \n";
>> tableEnv.executeSql(createTableDDl);
>>
>>
>> In a unit test, the following syntax allows me to create test input 
>> data with nested fields, but I have not found how to specify row time 
>> nor watermarks with this approach:
>>
>>
>> Table testTable = tableEnv.fromValues(
>>   DataTypes.ROW(
>>     DataTypes.FIELD("created",
>>       DataTypes.ROW(
>>         DataTypes.FIELD("group_id", DataTypes.STRING())
>>       )
>>     ),
>>     DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
>>   ),
>>
>>   row(row("group123"), "2021-02-03 11:36:20"),
>>   row(row("group123"), "2021-02-03 11:38:20"),
>>   row(row("group123"), "2021-02-03 11:40:20")
>> );
>> tableEnv.createTemporaryView("post_events_kafka", testTable);
>>
>>
>> I have also tried the following syntax, which allows to specify 
>> watermark and row time, but I have not found how to create a nested 
>> field with this approach:
>>
>>
>> var testData = List.of(
>>   Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
>>   sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
>>   Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
>> );
>> var testStream = streamEnv
>>   .fromCollection(testData)
>>   .assignTimestampsAndWatermarks(WatermarkStrategy
>>   .<Tuple2<String, 
>> Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
>>   .withTimestampAssigner(
>>     TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
>>   );
>> var testDataTable = tableEnv.fromDataStream(
>>   testStream,
>>   $("group_id"), $("true_as_of"), $("event_time").rowtime()
>> );
>> tableEnv.createTemporaryView("post_events_kafka", testDataTable);
>>
>>
>>
>> What am I missing?