Why data didn't enter the time window in EventTime mode

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Why data didn't enter the time window in EventTime mode

Soheil Pourbafrani
Hi,

In a datastream processing problem, the source generated data every 8 millisecond and timestamp is a field of the data. In default Flink time behavior data enter the time window but when I set Flink time to EventTime it will output nothing! Here is the code:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, JSONObject>>(Time.milliseconds(8)) {

@Override
public long extractTimestamp(Tuple3<String, Long, JSONObject> element) {
return element.f1 ;
}
}).keyBy(1).timeWindow(Time.milliseconds(8))
.allowedLateness(Time.milliseconds(3))
.sideOutputLateData(lateOutputTag)
.reduce(processing...
);
DataStream<Tuple3<String, Long, JSONObject>> lateData = res.getSideOutput(lateOutputTag);
res.print();
What is the problem with my code?
According to the Flink documents, my understanding about EventTime is that for example in case of time window when a new data received it start a new (logical window) based on new data event timestamp and wait 8 milliseconds (according to my code) to see if any other data with the same key received or not and after 8 millisecond (from timestamp of the first element of the window) it will be triggered. Since data source generated data in a constant periodic interval, I set a watermarck of  8, too. Is my understanding about Flink window in EventTime correct?
Reply | Threaded
Open this post in threaded view
|

Re: Why data didn't enter the time window in EventTime mode

Hequn Cheng
Hi Soheil,

wait 8 milliseconds (according to my code) to see if any other data with the same key received or not and after 8 millisecond it will be triggered.
Yes, but the time is event time, so if there is no data from source the time won't be advanced.

There are some reasons why the event time has not been advanced:
1. There are no data from the source
2. One of the source parallelisms doesn't have data
3. The time field, i.e, Long in Tuple3, should be millisecond instead of second.
4. Data should cover a longer time spam than the window size to advance the event time.

Best, Hequn

On Wed, Jul 18, 2018 at 3:53 PM, Soheil Pourbafrani <[hidden email]> wrote:
Hi,

In a datastream processing problem, the source generated data every 8 millisecond and timestamp is a field of the data. In default Flink time behavior data enter the time window but when I set Flink time to EventTime it will output nothing! Here is the code:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, JSONObject>>(Time.milliseconds(8)) {

@Override
public long extractTimestamp(Tuple3<String, Long, JSONObject> element) {
return element.f1 ;
}
}).keyBy(1).timeWindow(Time.milliseconds(8))
.allowedLateness(Time.milliseconds(3))
.sideOutputLateData(lateOutputTag)
.reduce(processing...
);
DataStream<Tuple3<String, Long, JSONObject>> lateData = res.getSideOutput(lateOutputTag);
res.print();
What is the problem with my code?
According to the Flink documents, my understanding about EventTime is that for example in case of time window when a new data received it start a new (logical window) based on new data event timestamp and wait 8 milliseconds (according to my code) to see if any other data with the same key received or not and after 8 millisecond (from timestamp of the first element of the window) it will be triggered. Since data source generated data in a constant periodic interval, I set a watermarck of  8, too. Is my understanding about Flink window in EventTime correct?

Reply | Threaded
Open this post in threaded view
|

Re: Why data didn't enter the time window in EventTime mode

Fabian Hueske-2
Hi Soheil,

Hequn is right. This might be an issue with advancing event-time.
You can monitor that by checking the watermarks in the web dashboard or print-debug it with a ProcessFunction which can lookup the current watermark.

Best, Fabian

2018-07-19 3:30 GMT+02:00 Hequn Cheng <[hidden email]>:
Hi Soheil,

wait 8 milliseconds (according to my code) to see if any other data with the same key received or not and after 8 millisecond it will be triggered.
Yes, but the time is event time, so if there is no data from source the time won't be advanced.

There are some reasons why the event time has not been advanced:
1. There are no data from the source
2. One of the source parallelisms doesn't have data
3. The time field, i.e, Long in Tuple3, should be millisecond instead of second.
4. Data should cover a longer time spam than the window size to advance the event time.

Best, Hequn

On Wed, Jul 18, 2018 at 3:53 PM, Soheil Pourbafrani <[hidden email]> wrote:
Hi,

In a datastream processing problem, the source generated data every 8 millisecond and timestamp is a field of the data. In default Flink time behavior data enter the time window but when I set Flink time to EventTime it will output nothing! Here is the code:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, JSONObject>>(Time.milliseconds(8)) {

@Override
public long extractTimestamp(Tuple3<String, Long, JSONObject> element) {
return element.f1 ;
}
}).keyBy(1).timeWindow(Time.milliseconds(8))
.allowedLateness(Time.milliseconds(3))
.sideOutputLateData(lateOutputTag)
.reduce(processing...
);
DataStream<Tuple3<String, Long, JSONObject>> lateData = res.getSideOutput(lateOutputTag);
res.print();
What is the problem with my code?
According to the Flink documents, my understanding about EventTime is that for example in case of time window when a new data received it start a new (logical window) based on new data event timestamp and wait 8 milliseconds (according to my code) to see if any other data with the same key received or not and after 8 millisecond (from timestamp of the first element of the window) it will be triggered. Since data source generated data in a constant periodic interval, I set a watermarck of  8, too. Is my understanding about Flink window in EventTime correct?


Reply | Threaded
Open this post in threaded view
|

Re: Why data didn't enter the time window in EventTime mode

Hequn Cheng
Hi Soheil,
You can monitor the watermarks in the web dashboard as Fabian said. There are some documents here[1].


On Thu, Jul 19, 2018 at 3:53 PM, Fabian Hueske <[hidden email]> wrote:
Hi Soheil,

Hequn is right. This might be an issue with advancing event-time.
You can monitor that by checking the watermarks in the web dashboard or print-debug it with a ProcessFunction which can lookup the current watermark.

Best, Fabian

2018-07-19 3:30 GMT+02:00 Hequn Cheng <[hidden email]>:
Hi Soheil,

wait 8 milliseconds (according to my code) to see if any other data with the same key received or not and after 8 millisecond it will be triggered.
Yes, but the time is event time, so if there is no data from source the time won't be advanced.

There are some reasons why the event time has not been advanced:
1. There are no data from the source
2. One of the source parallelisms doesn't have data
3. The time field, i.e, Long in Tuple3, should be millisecond instead of second.
4. Data should cover a longer time spam than the window size to advance the event time.

Best, Hequn

On Wed, Jul 18, 2018 at 3:53 PM, Soheil Pourbafrani <[hidden email]> wrote:
Hi,

In a datastream processing problem, the source generated data every 8 millisecond and timestamp is a field of the data. In default Flink time behavior data enter the time window but when I set Flink time to EventTime it will output nothing! Here is the code:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, JSONObject>>(Time.milliseconds(8)) {

@Override
public long extractTimestamp(Tuple3<String, Long, JSONObject> element) {
return element.f1 ;
}
}).keyBy(1).timeWindow(Time.milliseconds(8))
.allowedLateness(Time.milliseconds(3))
.sideOutputLateData(lateOutputTag)
.reduce(processing...
);
DataStream<Tuple3<String, Long, JSONObject>> lateData = res.getSideOutput(lateOutputTag);
res.print();
What is the problem with my code?
According to the Flink documents, my understanding about EventTime is that for example in case of time window when a new data received it start a new (logical window) based on new data event timestamp and wait 8 milliseconds (according to my code) to see if any other data with the same key received or not and after 8 millisecond (from timestamp of the first element of the window) it will be triggered. Since data source generated data in a constant periodic interval, I set a watermarck of  8, too. Is my understanding about Flink window in EventTime correct?