Posted by
chen on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/flink-eventTime-lateness-maxoutoforderness-tp17318p17440.html
Hi Eron,
Thanks for your help. Actually I know maxoutoforder, lateness is based
on Event Time. But in my test it is not. Following is my code and test data.
"key1|1483250640000|",
"key1|1483250636000|",
"key1|1483250649000|",
"key1|1483250642000|",
"key1|1483250650000|",
"key1|1483250641000|",
"key1|1483250653000|",
"key1|1483250648000|",
"key1|1483250645000|",
"key1|1483250658000|",
"key1|1483250647000|",
"key1|1483250643000|",
"key1|1483250661000|",
"key1|1483250662000|",
"key1|1483250667000|",
"key1|1483250663000|",
dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(5))
.fold(initRow(), new FoldFunction<Row, Row>() {
@Override
public Row fold(Row ret, Row o) throws Exception {
ret.setField(0, (int)ret.getField(0) + 1);
ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
return ret;
}
})
1. Send Data *WITHOUT*Thread.sleep(), the result is like this :
1,1483250636000|
4,1483250640000|1483250642000|1483250641000|1483250643000|
4,1483250649000|1483250648000|1483250645000|1483250647000|
2,1483250650000|1483250653000|
1,1483250658000|
3,1483250661000|1483250662000|1483250663000|
1,1483250667000|
2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of allowedLateness, it will trigger the window to calculate again,
the result will come out again.
1,1483250636000|
1,1483250640000|
2,1483250640000|1483250642000|
1,1483250649000|
2,1483250649000|1483250648000|
3,1483250649000|1483250648000|1483250645000|
2,1483250650000|1483250653000|
1,1483250658000|
2,1483250661000|1483250662000|
3,1483250661000|1483250662000|1483250663000|
1,1483250667000|
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/