Problem creating tumbling windows based on number of rows

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem creating tumbling windows based on number of rows

A. V.

 Hi, 

try to create a tumbling time window of 2 rows each in Flink Java. This must based on the dateTime (TimeStamp3 datatype) or unixDateTime(BIGINT datatype) column. I've added below the code of two different code versions. The error messages I get I placed above the code.

When I print the datatypes of the Table object I see this:  |-- mID: INT |-- dateTime: TIMESTAMP(3) *ROWTIME* |-- mValue: DOUBLE |-- unixDateTime: BIGINT |-- mType: STRING

StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv);   
        fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    TupleTypeInfo<Tuple5<Integer, Timestamp, Double, Long, String>> tupleType = new TupleTypeInfo<>(
        Types.INT(),
        Types.SQL_TIMESTAMP(),
        Types.DOUBLE(),
        Types.LONG(),
        Types.STRING());
        DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple =
                tableEnv.toAppendStream(HTable, tupleType);

//When I run below code I get this error: Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.

Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, mValue, unixDateTime, mType");
   DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
stream.print();

//When I run below code I get this error: Exception in thread "main" java.lang.UnsupportedOperationException: Event-time grouping windows on row intervals are currently not supported.

   Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, measurementValue, unixDateTime, measurementType")
.window(Tumble.over("2.rows")
.on("dateTime")
.as("a"))
.groupBy("a")
.select("AVG(mValue)");
    DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
    stream.print();

Reply | Threaded
Open this post in threaded view
|

Re: Problem creating tumbling windows based on number of rows

Manoj Kumar
Hi A.V.,

//When I run below code I get this error: Caused by: java.lang.RuntimeException: Rowtime timestamp is null.
//Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.


You need to assign Timestamp and watermarks to datastream

sample data stream code

   DataStream<Tuple17<String, Timestamp>>
        lineitem = bsEnv.socketTextStream("localhost", 12347).flatMap(new Splitter2()).assignTimestampsAndWatermarks(
        new AscendingTimestampExtractor<Tuple2<String, Timestamp>>() {
          @Override public long extractAscendingTimestamp(
              Tuple2<String,Timestamp> inputrowtuple) {
            return inputrowtuple.f2.getTime();
          }
        });


//When I run below code I get this error: Exception in thread "main" java.lang.UnsupportedOperationException: Event-time grouping windows on row intervals are currently not supported.


Currently this feature is only supported for Process time, change .rowtime to .proctime in schema

On Wed, Oct 23, 2019 at 1:11 PM A. V. <[hidden email]> wrote:

 Hi, 

try to create a tumbling time window of 2 rows each in Flink Java. This must based on the dateTime (TimeStamp3 datatype) or unixDateTime(BIGINT datatype) column. I've added below the code of two different code versions. The error messages I get I placed above the code.

When I print the datatypes of the Table object I see this:  |-- mID: INT |-- dateTime: TIMESTAMP(3) *ROWTIME* |-- mValue: DOUBLE |-- unixDateTime: BIGINT |-- mType: STRING

StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv);   
        fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    TupleTypeInfo<Tuple5<Integer, Timestamp, Double, Long, String>> tupleType = new TupleTypeInfo<>(
        Types.INT(),
        Types.SQL_TIMESTAMP(),
        Types.DOUBLE(),
        Types.LONG(),
        Types.STRING());
        DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple =
                tableEnv.toAppendStream(HTable, tupleType);

//When I run below code I get this error: Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.

Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, mValue, unixDateTime, mType");
   DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
stream.print();

//When I run below code I get this error: Exception in thread "main" java.lang.UnsupportedOperationException: Event-time grouping windows on row intervals are currently not supported.

   Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, measurementValue, unixDateTime, measurementType")
.window(Tumble.over("2.rows")
.on("dateTime")
.as("a"))
.groupBy("a")
.select("AVG(mValue)");
    DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
    stream.print();



--
Regards,
Manoj  Kumar