Tumbling time window cannot group events properly

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Tumbling time window cannot group events properly

Yukun Guo
Hi,

I wrote a program which constructs a WindowedStream to compute periodic data statistics every 10 seconds. However, I found that events have not been strictly grouped into windows of 10s duration, i.e., some events are leaking into the adjacent window.

The output is like this:

Mon, 04 Jul 2016 11:11:50 CST  # 1
Mon, 04 Jul 2016 11:11:50 CST  # 2
# removed for brevity
Mon, 04 Jul 2016 11:11:59 CST  # 99
99 events in this window
Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong window

Mon, 04 Jul 2016 11:12:00 CST

Here is the code:

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TimeWindow {

private static class TimestampAssigner implements AssignerWithPeriodicWatermarks<Long> {
private final long DELAY = 500;
private long currentWatermark;

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentWatermark);
}

@Override
public long extractTimestamp(Long event, long l) {
currentWatermark = Math.max(currentWatermark, event - DELAY);
return event;
}
}

public static void main(String[] args) throws Exception {
final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<Long> stream = env.addSource(new RichParallelSourceFunction<Long>() {
private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(System.currentTimeMillis());
Thread.sleep(200);
}

sourceContext.close();
}

@Override
public void cancel() {
isRunning = false;
}
});

stream
.assignTimestampsAndWatermarks(new TimestampAssigner())
.keyBy(new KeySelector<Long, Integer>() {
@Override
public Integer getKey(Long x) throws Exception {
return 0;
}
})
.timeWindow(Time.seconds(10))
.fold(0, new FoldFunction<Long, Integer>() {
@Override
public Integer fold(Integer count, Long x) throws Exception {
System.out.println(formatter.format(x));
return count + 1;
}
})
.map(new MapFunction<Integer, Void>() {
@Override
public Void map(Integer count) throws Exception {
System.out.println(count + " events in this window");
return null;
}
});

env.execute();
}
}

It doesn't always happen, but if you run the program long enough it can be observed for sure.
Adjusting the DELAY value of watermark generation does not change the behavior.
Reply | Threaded
Open this post in threaded view
|

Re: Tumbling time window cannot group events properly

Aljoscha Krettek
Hi,
I think it should be as simple as setting event time as the stream time characteristic:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

The problem is that .timeWindow(Time.seconds(10)) will use processing time if you don't specify a time characteristic. You can enforce using an event-time window using this:

stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))

Cheers,
Aljoscha


On Mon, 4 Jul 2016 at 06:00 Yukun Guo <[hidden email]> wrote:
Hi,

I wrote a program which constructs a WindowedStream to compute periodic data statistics every 10 seconds. However, I found that events have not been strictly grouped into windows of 10s duration, i.e., some events are leaking into the adjacent window.

The output is like this:

Mon, 04 Jul 2016 11:11:50 CST  # 1
Mon, 04 Jul 2016 11:11:50 CST  # 2
# removed for brevity
Mon, 04 Jul 2016 11:11:59 CST  # 99
99 events in this window
Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong window

Mon, 04 Jul 2016 11:12:00 CST

Here is the code:

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TimeWindow {

private static class TimestampAssigner implements AssignerWithPeriodicWatermarks<Long> {
private final long DELAY = 500;
private long currentWatermark;

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentWatermark);
}

@Override
public long extractTimestamp(Long event, long l) {
currentWatermark = Math.max(currentWatermark, event - DELAY);
return event;
}
}

public static void main(String[] args) throws Exception {
final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<Long> stream = env.addSource(new RichParallelSourceFunction<Long>() {
private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(System.currentTimeMillis());
Thread.sleep(200);
}

sourceContext.close();
}

@Override
public void cancel() {
isRunning = false;
}
});

stream
.assignTimestampsAndWatermarks(new TimestampAssigner())
.keyBy(new KeySelector<Long, Integer>() {
@Override
public Integer getKey(Long x) throws Exception {
return 0;
}
})
.timeWindow(Time.seconds(10))
.fold(0, new FoldFunction<Long, Integer>() {
@Override
public Integer fold(Integer count, Long x) throws Exception {
System.out.println(formatter.format(x));
return count + 1;
}
})
.map(new MapFunction<Integer, Void>() {
@Override
public Void map(Integer count) throws Exception {
System.out.println(count + " events in this window");
return null;
}
});

env.execute();
}
}

It doesn't always happen, but if you run the program long enough it can be observed for sure.
Adjusting the DELAY value of watermark generation does not change the behavior.
Reply | Threaded
Open this post in threaded view
|

Re: Tumbling time window cannot group events properly

Yukun Guo
Thanks for the information. Strange enough, after I set the time characteristic to EventTime, the events are leaking into the previous window:

...
Mon, 04 Jul 2016 19:10:49 CST
Mon, 04 Jul 2016 19:10:50 CST # ?
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
100 events in this window
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:51 CST
Mon, 04 Jul 2016 19:10:51 CST


On 4 July 2016 at 16:15, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think it should be as simple as setting event time as the stream time characteristic:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

The problem is that .timeWindow(Time.seconds(10)) will use processing time if you don't specify a time characteristic. You can enforce using an event-time window using this:

stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))

Cheers,
Aljoscha


On Mon, 4 Jul 2016 at 06:00 Yukun Guo <[hidden email]> wrote:
Hi,

I wrote a program which constructs a WindowedStream to compute periodic data statistics every 10 seconds. However, I found that events have not been strictly grouped into windows of 10s duration, i.e., some events are leaking into the adjacent window.

The output is like this:

Mon, 04 Jul 2016 11:11:50 CST  # 1
Mon, 04 Jul 2016 11:11:50 CST  # 2
# removed for brevity
Mon, 04 Jul 2016 11:11:59 CST  # 99
99 events in this window
Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong window

Mon, 04 Jul 2016 11:12:00 CST

Here is the code:

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TimeWindow {

private static class TimestampAssigner implements AssignerWithPeriodicWatermarks<Long> {
private final long DELAY = 500;
private long currentWatermark;

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentWatermark);
}

@Override
public long extractTimestamp(Long event, long l) {
currentWatermark = Math.max(currentWatermark, event - DELAY);
return event;
}
}

public static void main(String[] args) throws Exception {
final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<Long> stream = env.addSource(new RichParallelSourceFunction<Long>() {
private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(System.currentTimeMillis());
Thread.sleep(200);
}

sourceContext.close();
}

@Override
public void cancel() {
isRunning = false;
}
});

stream
.assignTimestampsAndWatermarks(new TimestampAssigner())
.keyBy(new KeySelector<Long, Integer>() {
@Override
public Integer getKey(Long x) throws Exception {
return 0;
}
})
.timeWindow(Time.seconds(10))
.fold(0, new FoldFunction<Long, Integer>() {
@Override
public Integer fold(Integer count, Long x) throws Exception {
System.out.println(formatter.format(x));
return count + 1;
}
})
.map(new MapFunction<Integer, Void>() {
@Override
public Void map(Integer count) throws Exception {
System.out.println(count + " events in this window");
return null;
}
});

env.execute();
}
}

It doesn't always happen, but if you run the program long enough it can be observed for sure.
Adjusting the DELAY value of watermark generation does not change the behavior.

Reply | Threaded
Open this post in threaded view
|

Re: Tumbling time window cannot group events properly

Aljoscha Krettek
Could you please elaborate a bit on what exactly the output means and how you derive that events are leaking into the previous window?

On Mon, 4 Jul 2016 at 13:20 Yukun Guo <[hidden email]> wrote:
Thanks for the information. Strange enough, after I set the time characteristic to EventTime, the events are leaking into the previous window:

...
Mon, 04 Jul 2016 19:10:49 CST
Mon, 04 Jul 2016 19:10:50 CST # ?
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
100 events in this window
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:51 CST
Mon, 04 Jul 2016 19:10:51 CST


On 4 July 2016 at 16:15, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think it should be as simple as setting event time as the stream time characteristic:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

The problem is that .timeWindow(Time.seconds(10)) will use processing time if you don't specify a time characteristic. You can enforce using an event-time window using this:

stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))

Cheers,
Aljoscha


On Mon, 4 Jul 2016 at 06:00 Yukun Guo <[hidden email]> wrote:
Hi,

I wrote a program which constructs a WindowedStream to compute periodic data statistics every 10 seconds. However, I found that events have not been strictly grouped into windows of 10s duration, i.e., some events are leaking into the adjacent window.

The output is like this:

Mon, 04 Jul 2016 11:11:50 CST  # 1
Mon, 04 Jul 2016 11:11:50 CST  # 2
# removed for brevity
Mon, 04 Jul 2016 11:11:59 CST  # 99
99 events in this window
Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong window

Mon, 04 Jul 2016 11:12:00 CST

Here is the code:

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TimeWindow {

private static class TimestampAssigner implements AssignerWithPeriodicWatermarks<Long> {
private final long DELAY = 500;
private long currentWatermark;

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentWatermark);
}

@Override
public long extractTimestamp(Long event, long l) {
currentWatermark = Math.max(currentWatermark, event - DELAY);
return event;
}
}

public static void main(String[] args) throws Exception {
final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<Long> stream = env.addSource(new RichParallelSourceFunction<Long>() {
private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(System.currentTimeMillis());
Thread.sleep(200);
}

sourceContext.close();
}

@Override
public void cancel() {
isRunning = false;
}
});

stream
.assignTimestampsAndWatermarks(new TimestampAssigner())
.keyBy(new KeySelector<Long, Integer>() {
@Override
public Integer getKey(Long x) throws Exception {
return 0;
}
})
.timeWindow(Time.seconds(10))
.fold(0, new FoldFunction<Long, Integer>() {
@Override
public Integer fold(Integer count, Long x) throws Exception {
System.out.println(formatter.format(x));
return count + 1;
}
})
.map(new MapFunction<Integer, Void>() {
@Override
public Void map(Integer count) throws Exception {
System.out.println(count + " events in this window");
return null;
}
});

env.execute();
}
}

It doesn't always happen, but if you run the program long enough it can be observed for sure.
Adjusting the DELAY value of watermark generation does not change the behavior.

Reply | Threaded
Open this post in threaded view
|

Re: Tumbling time window cannot group events properly

Yukun Guo
The output is the timestamps of events in string. (For convenience, the payload of each event is exactly the timestamp of it.) As soon as the folding of a time window is finished, the code will print "# events in this window" indicating the end of the window.

The 10s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59], ..., but in the example above, the events at 19:10:50, which belong to [19:10:50, 19:10:59] window were mistakenly put in the [19:10:40, 19:10:49] one.

On 4 July 2016 at 21:41, Aljoscha Krettek <[hidden email]> wrote:
Could you please elaborate a bit on what exactly the output means and how you derive that events are leaking into the previous window?

On Mon, 4 Jul 2016 at 13:20 Yukun Guo <[hidden email]> wrote:
Thanks for the information. Strange enough, after I set the time characteristic to EventTime, the events are leaking into the previous window:

...
Mon, 04 Jul 2016 19:10:49 CST
Mon, 04 Jul 2016 19:10:50 CST # ?
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
100 events in this window
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:51 CST
Mon, 04 Jul 2016 19:10:51 CST


On 4 July 2016 at 16:15, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think it should be as simple as setting event time as the stream time characteristic:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

The problem is that .timeWindow(Time.seconds(10)) will use processing time if you don't specify a time characteristic. You can enforce using an event-time window using this:

stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))

Cheers,
Aljoscha


On Mon, 4 Jul 2016 at 06:00 Yukun Guo <[hidden email]> wrote:
Hi,

I wrote a program which constructs a WindowedStream to compute periodic data statistics every 10 seconds. However, I found that events have not been strictly grouped into windows of 10s duration, i.e., some events are leaking into the adjacent window.

The output is like this:

Mon, 04 Jul 2016 11:11:50 CST  # 1
Mon, 04 Jul 2016 11:11:50 CST  # 2
# removed for brevity
Mon, 04 Jul 2016 11:11:59 CST  # 99
99 events in this window
Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong window

Mon, 04 Jul 2016 11:12:00 CST

Here is the code:

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TimeWindow {

private static class TimestampAssigner implements AssignerWithPeriodicWatermarks<Long> {
private final long DELAY = 500;
private long currentWatermark;

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentWatermark);
}

@Override
public long extractTimestamp(Long event, long l) {
currentWatermark = Math.max(currentWatermark, event - DELAY);
return event;
}
}

public static void main(String[] args) throws Exception {
final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<Long> stream = env.addSource(new RichParallelSourceFunction<Long>() {
private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(System.currentTimeMillis());
Thread.sleep(200);
}

sourceContext.close();
}

@Override
public void cancel() {
isRunning = false;
}
});

stream
.assignTimestampsAndWatermarks(new TimestampAssigner())
.keyBy(new KeySelector<Long, Integer>() {
@Override
public Integer getKey(Long x) throws Exception {
return 0;
}
})
.timeWindow(Time.seconds(10))
.fold(0, new FoldFunction<Long, Integer>() {
@Override
public Integer fold(Integer count, Long x) throws Exception {
System.out.println(formatter.format(x));
return count + 1;
}
})
.map(new MapFunction<Integer, Void>() {
@Override
public Void map(Integer count) throws Exception {
System.out.println(count + " events in this window");
return null;
}
});

env.execute();
}
}

It doesn't always happen, but if you run the program long enough it can be observed for sure.
Adjusting the DELAY value of watermark generation does not change the behavior.


Reply | Threaded
Open this post in threaded view
|

Re: Tumbling time window cannot group events properly

Aljoscha Krettek
The order in which elements are added to internal buffers and the point in time when FoldFunction.fold() is called don't indicate to which window elements are added. Flink will internally keep a buffer for each window and emit the window once the watermark passes the end of the window. In your case, there could be several windows in-flight at one given time. So the elements with a timestamp in [19:10:40, 19:10:49] will be added to that window and elements with a timestamp in [19:10:50, 19:10:59] will be added to this other window.

Looking at your log, the "100 events in this window" message indicates that the watermark probably passed the end of the [19:10:40, 19:10:49] window and the result for that window was emitted. The elements with timestamp 19:10:50 that you see before that in the log are added to the buffer for a later window that will be emitted at a future time.

On Tue, 5 Jul 2016 at 04:35 Yukun Guo <[hidden email]> wrote:
The output is the timestamps of events in string. (For convenience, the payload of each event is exactly the timestamp of it.) As soon as the folding of a time window is finished, the code will print "# events in this window" indicating the end of the window.

The 10s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59], ..., but in the example above, the events at 19:10:50, which belong to [19:10:50, 19:10:59] window were mistakenly put in the [19:10:40, 19:10:49] one.

On 4 July 2016 at 21:41, Aljoscha Krettek <[hidden email]> wrote:
Could you please elaborate a bit on what exactly the output means and how you derive that events are leaking into the previous window?

On Mon, 4 Jul 2016 at 13:20 Yukun Guo <[hidden email]> wrote:
Thanks for the information. Strange enough, after I set the time characteristic to EventTime, the events are leaking into the previous window:

...
Mon, 04 Jul 2016 19:10:49 CST
Mon, 04 Jul 2016 19:10:50 CST # ?
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
100 events in this window
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:51 CST
Mon, 04 Jul 2016 19:10:51 CST


On 4 July 2016 at 16:15, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think it should be as simple as setting event time as the stream time characteristic:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

The problem is that .timeWindow(Time.seconds(10)) will use processing time if you don't specify a time characteristic. You can enforce using an event-time window using this:

stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))

Cheers,
Aljoscha


On Mon, 4 Jul 2016 at 06:00 Yukun Guo <[hidden email]> wrote:
Hi,

I wrote a program which constructs a WindowedStream to compute periodic data statistics every 10 seconds. However, I found that events have not been strictly grouped into windows of 10s duration, i.e., some events are leaking into the adjacent window.

The output is like this:

Mon, 04 Jul 2016 11:11:50 CST  # 1
Mon, 04 Jul 2016 11:11:50 CST  # 2
# removed for brevity
Mon, 04 Jul 2016 11:11:59 CST  # 99
99 events in this window
Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong window

Mon, 04 Jul 2016 11:12:00 CST

Here is the code:

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TimeWindow {

private static class TimestampAssigner implements AssignerWithPeriodicWatermarks<Long> {
private final long DELAY = 500;
private long currentWatermark;

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentWatermark);
}

@Override
public long extractTimestamp(Long event, long l) {
currentWatermark = Math.max(currentWatermark, event - DELAY);
return event;
}
}

public static void main(String[] args) throws Exception {
final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<Long> stream = env.addSource(new RichParallelSourceFunction<Long>() {
private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(System.currentTimeMillis());
Thread.sleep(200);
}

sourceContext.close();
}

@Override
public void cancel() {
isRunning = false;
}
});

stream
.assignTimestampsAndWatermarks(new TimestampAssigner())
.keyBy(new KeySelector<Long, Integer>() {
@Override
public Integer getKey(Long x) throws Exception {
return 0;
}
})
.timeWindow(Time.seconds(10))
.fold(0, new FoldFunction<Long, Integer>() {
@Override
public Integer fold(Integer count, Long x) throws Exception {
System.out.println(formatter.format(x));
return count + 1;
}
})
.map(new MapFunction<Integer, Void>() {
@Override
public Void map(Integer count) throws Exception {
System.out.println(count + " events in this window");
return null;
}
});

env.execute();
}
}

It doesn't always happen, but if you run the program long enough it can be observed for sure.
Adjusting the DELAY value of watermark generation does not change the behavior.


Reply | Threaded
Open this post in threaded view
|

Re: Tumbling time window cannot group events properly

Yukun Guo
You're right, I forgot to check that the "events in this window" line actually showed the number of events inside each window was what I expected, despite being printed a bit out of order. Thank you for the help!

On 5 July 2016 at 17:37, Aljoscha Krettek <[hidden email]> wrote:
The order in which elements are added to internal buffers and the point in time when FoldFunction.fold() is called don't indicate to which window elements are added. Flink will internally keep a buffer for each window and emit the window once the watermark passes the end of the window. In your case, there could be several windows in-flight at one given time. So the elements with a timestamp in [19:10:40, 19:10:49] will be added to that window and elements with a timestamp in [19:10:50, 19:10:59] will be added to this other window.

Looking at your log, the "100 events in this window" message indicates that the watermark probably passed the end of the [19:10:40, 19:10:49] window and the result for that window was emitted. The elements with timestamp 19:10:50 that you see before that in the log are added to the buffer for a later window that will be emitted at a future time.

On Tue, 5 Jul 2016 at 04:35 Yukun Guo <[hidden email]> wrote:
The output is the timestamps of events in string. (For convenience, the payload of each event is exactly the timestamp of it.) As soon as the folding of a time window is finished, the code will print "# events in this window" indicating the end of the window.

The 10s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59], ..., but in the example above, the events at 19:10:50, which belong to [19:10:50, 19:10:59] window were mistakenly put in the [19:10:40, 19:10:49] one.

On 4 July 2016 at 21:41, Aljoscha Krettek <[hidden email]> wrote:
Could you please elaborate a bit on what exactly the output means and how you derive that events are leaking into the previous window?

On Mon, 4 Jul 2016 at 13:20 Yukun Guo <[hidden email]> wrote:
Thanks for the information. Strange enough, after I set the time characteristic to EventTime, the events are leaking into the previous window:

...
Mon, 04 Jul 2016 19:10:49 CST
Mon, 04 Jul 2016 19:10:50 CST # ?
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
100 events in this window
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:51 CST
Mon, 04 Jul 2016 19:10:51 CST


On 4 July 2016 at 16:15, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think it should be as simple as setting event time as the stream time characteristic:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

The problem is that .timeWindow(Time.seconds(10)) will use processing time if you don't specify a time characteristic. You can enforce using an event-time window using this:

stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))

Cheers,
Aljoscha


On Mon, 4 Jul 2016 at 06:00 Yukun Guo <[hidden email]> wrote:
Hi,

I wrote a program which constructs a WindowedStream to compute periodic data statistics every 10 seconds. However, I found that events have not been strictly grouped into windows of 10s duration, i.e., some events are leaking into the adjacent window.

The output is like this:

Mon, 04 Jul 2016 11:11:50 CST  # 1
Mon, 04 Jul 2016 11:11:50 CST  # 2
# removed for brevity
Mon, 04 Jul 2016 11:11:59 CST  # 99
99 events in this window
Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong window

Mon, 04 Jul 2016 11:12:00 CST

Here is the code:

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TimeWindow {

private static class TimestampAssigner implements AssignerWithPeriodicWatermarks<Long> {
private final long DELAY = 500;
private long currentWatermark;

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentWatermark);
}

@Override
public long extractTimestamp(Long event, long l) {
currentWatermark = Math.max(currentWatermark, event - DELAY);
return event;
}
}

public static void main(String[] args) throws Exception {
final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<Long> stream = env.addSource(new RichParallelSourceFunction<Long>() {
private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(System.currentTimeMillis());
Thread.sleep(200);
}

sourceContext.close();
}

@Override
public void cancel() {
isRunning = false;
}
});

stream
.assignTimestampsAndWatermarks(new TimestampAssigner())
.keyBy(new KeySelector<Long, Integer>() {
@Override
public Integer getKey(Long x) throws Exception {
return 0;
}
})
.timeWindow(Time.seconds(10))
.fold(0, new FoldFunction<Long, Integer>() {
@Override
public Integer fold(Integer count, Long x) throws Exception {
System.out.println(formatter.format(x));
return count + 1;
}
})
.map(new MapFunction<Integer, Void>() {
@Override
public Void map(Integer count) throws Exception {
System.out.println(count + " events in this window");
return null;
}
});

env.execute();
}
}

It doesn't always happen, but if you run the program long enough it can be observed for sure.
Adjusting the DELAY value of watermark generation does not change the behavior.