eventTime, lateness, maxoutoforderness are all about time.
event Time is the water mark time on the record. lateness is record time or the real word time? maxoutoforderness is record time or the real word time? dataStream.keyBy(row -> (String)row.getField(0)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(5)) .fold(initRow(), new MyFoldFunction()) public Watermark getCurrentWatermark() { return new Watermark(currentTime - 5000);} Does anyone could explain the time of eventTime,lateness,maxoutoforderness? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Take a look at the section of Flink documentation titled "Event Time and Watermarks": Also read the excellent series "Streaming 101" and "102", has useful animations depicting the flow of time: Think of the watermark as a clock, ticking along due to information from the connector or from a watermark generator. Hope this helps, Eron On Sat, Dec 16, 2017 at 8:07 AM, chen <[hidden email]> wrote: eventTime, lateness, maxoutoforderness are all about time. |
In reply to this post by chen
Hi, lateness is record time or the real word time? i.e., - given the end timestamp of a window is x (in event time) and allowed lateness is y, the window state is cleared only when the current watermark of the window operator passes x+y - the BoundedOutOfOrdernessTimestampExtractor emits watermarks that lag behind the max record timestamp (in event time) by a fixed amount of time (again, in event time). Cheers, Gordon
On 16 December 2017 at 8:07:15 AM, chen ([hidden email]) wrote:
|
In reply to this post by Eron Wright
Hi Eron,
Thanks for your help. Actually I know maxoutoforder, lateness is based on Event Time. But in my test it is not. Following is my code and test data. "key1|1483250640000|", "key1|1483250636000|", "key1|1483250649000|", "key1|1483250642000|", "key1|1483250650000|", "key1|1483250641000|", "key1|1483250653000|", "key1|1483250648000|", "key1|1483250645000|", "key1|1483250658000|", "key1|1483250647000|", "key1|1483250643000|", "key1|1483250661000|", "key1|1483250662000|", "key1|1483250667000|", "key1|1483250663000|", dataStream.keyBy(row -> (String)row.getField(0)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(5)) .fold(initRow(), new FoldFunction<Row, Row>() { @Override public Row fold(Row ret, Row o) throws Exception { ret.setField(0, (int)ret.getField(0) + 1); ret.setField(1, (String)ret.getField(1)+ o.getField(1) + "|"); return ret; } }) 1. Send Data *WITHOUT*Thread.sleep(), the result is like this : 1,1483250636000| 4,1483250640000|1483250642000|1483250641000|1483250643000| 4,1483250649000|1483250648000|1483250645000|1483250647000| 2,1483250650000|1483250653000| 1,1483250658000| 3,1483250661000|1483250662000|1483250663000| 1,1483250667000| 2. Send Data WITH Thread.sleep(), the result is like this, we will see the function of allowedLateness, it will trigger the window to calculate again, the result will come out again. 1,1483250636000| 1,1483250640000| 2,1483250640000|1483250642000| 1,1483250649000| 2,1483250649000|1483250648000| 3,1483250649000|1483250648000|1483250645000| 2,1483250650000|1483250653000| 1,1483250658000| 2,1483250661000|1483250662000| 3,1483250661000|1483250662000|1483250663000| 1,1483250667000| -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Eron Wright
CODE with maxOutOfOrdernesstime effect: dataStream.keyBy(row -> (String)row.getField(0)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .fold(initRow(), new FoldFunction<Row, Row>() { @Override public Row fold(Row ret, Row o) throws Exception { ret.setField(0, (int)ret.getField(0) + 1); ret.setField(1, (String)ret.getField(1)+ o.getField(1) + "|"); return ret; } }); public Watermark getCurrentWatermark(){ return new Watermark(currentTime - 5000);} 1. Send Data *WITHOUT*Thread.sleep(), the result is like this : 1,1483250636000| 4,1483250640000|1483250642000|1483250641000|1483250643000| 4,1483250649000|1483250648000|1483250645000|1483250647000| 2,1483250650000|1483250653000| 1,1483250658000| 3,1483250661000|1483250662000|1483250663000| 1,1483250667000| 2. Send Data WITH Thread.sleep(), the result is like this, we will see the function of maxOutOfOrdernesstime, it will delay calculate, then coming out result. 1,1483250636000| 2,1483250640000|1483250642000| 3,1483250649000|1483250648000|1483250645000| 2,1483250650000|1483250653000| 1,1483250658000| 3,1483250661000|1483250662000|1483250663000| 1,1483250667000| I don`t know how to explain the eventTime, lateness, maxOutOfOrderness. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Tzu-Li (Gordon) Tai
Thanks Gordon, Please see the rely. I use code, but the result it doesn`t
like what the doc explain. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |