Posted by
chen on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/flink-eventTime-lateness-maxoutoforderness-tp17318p17441.html
CODE with maxOutOfOrdernesstime effect:
dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.fold(initRow(), new FoldFunction<Row, Row>() {
@Override
public Row fold(Row ret, Row o) throws Exception {
ret.setField(0, (int)ret.getField(0) + 1);
ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
return ret;
}
});
public Watermark getCurrentWatermark(){
return new Watermark(currentTime - 5000);}
1. Send Data *WITHOUT*Thread.sleep(), the result is like this :
1,1483250636000|
4,1483250640000|1483250642000|1483250641000|1483250643000|
4,1483250649000|1483250648000|1483250645000|1483250647000|
2,1483250650000|1483250653000|
1,1483250658000|
3,1483250661000|1483250662000|1483250663000|
1,1483250667000|
2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of maxOutOfOrdernesstime, it will delay calculate, then coming out
result.
1,1483250636000|
2,1483250640000|1483250642000|
3,1483250649000|1483250648000|1483250645000|
2,1483250650000|1483250653000|
1,1483250658000|
3,1483250661000|1483250662000|1483250663000|
1,1483250667000|
I don`t know how to explain the eventTime, lateness, maxOutOfOrderness.
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/