Hi,
I wrote a program which constructs a WindowedStream to compute periodic data statistics every 10 seconds. However, I found that events have not been strictly grouped into windows of 10s duration, i.e., some events are leaking into the adjacent window. The output is like this: Mon, 04 Jul 2016 11:11:50 CST # 1 Mon, 04 Jul 2016 11:11:50 CST # 2 # removed for brevity Mon, 04 Jul 2016 11:11:59 CST # 99 99 events in this window Mon, 04 Jul 2016 11:11:59 CST # This event has been put in the wrong window Mon, 04 Jul 2016 11:12:00 CST Here is the code: import org.apache.commons.lang3.time.FastDateFormat; It doesn't always happen, but if you run the program long enough it can be observed for sure. Adjusting the DELAY value of watermark generation does not change the behavior. |
Hi, I think it should be as simple as setting event time as the stream time characteristic: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) The problem is that .timeWindow(Time.seconds(10)) will use processing time if you don't specify a time characteristic. You can enforce using an event-time window using this: stream.window(EventTimeTumblingWindows.of(Time.seconds(10))) Cheers, Aljoscha On Mon, 4 Jul 2016 at 06:00 Yukun Guo <[hidden email]> wrote:
|
Thanks for the information. Strange enough, after I set the time characteristic to EventTime, the events are leaking into the previous window: ... Mon, 04 Jul 2016 19:10:49 CST Mon, 04 Jul 2016 19:10:50 CST # ? Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST 100 events in this window Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:51 CST Mon, 04 Jul 2016 19:10:51 CST On 4 July 2016 at 16:15, Aljoscha Krettek <[hidden email]> wrote:
|
Could you please elaborate a bit on what exactly the output means and how you derive that events are leaking into the previous window? On Mon, 4 Jul 2016 at 13:20 Yukun Guo <[hidden email]> wrote:
|
The output is the timestamps of events in string. (For convenience, the payload of each event is exactly the timestamp of it.) As soon as the folding of a time window is finished, the code will print "# events in this window" indicating the end of the window. The 10s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59], ..., but in the example above, the events at 19:10:50, which belong to [19:10:50, 19:10:59] window were mistakenly put in the [19:10:40, 19:10:49] one. On 4 July 2016 at 21:41, Aljoscha Krettek <[hidden email]> wrote:
|
The order in which elements are added to internal buffers and the point in time when FoldFunction.fold() is called don't indicate to which window elements are added. Flink will internally keep a buffer for each window and emit the window once the watermark passes the end of the window. In your case, there could be several windows in-flight at one given time. So the elements with a timestamp in [19:10:40, 19:10:49] will be added to that window and elements with a timestamp in [19:10:50, 19:10:59] will be added to this other window. On Tue, 5 Jul 2016 at 04:35 Yukun Guo <[hidden email]> wrote:
|
You're right, I forgot to check that the "events in this window" line actually showed the number of events inside each window was what I expected, despite being printed a bit out of order. Thank you for the help! On 5 July 2016 at 17:37, Aljoscha Krettek <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |