flink eventTime, lateness, maxoutoforderness

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink eventTime, lateness, maxoutoforderness

chen
eventTime, lateness,  maxoutoforderness are all about time.
event Time is the water mark time on the record.
lateness is record time or the real word time?
maxoutoforderness is record time or the real word time?

dataStream.keyBy(row -> (String)row.getField(0))
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
         .allowedLateness(Time.seconds(5))
         .fold(initRow(), new MyFoldFunction())

public Watermark getCurrentWatermark() {
        return new Watermark(currentTime - 5000);}

Does anyone could explain the time of eventTime,lateness,maxoutoforderness?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink eventTime, lateness, maxoutoforderness

Eron Wright
Take a look at the section of Flink documentation titled "Event Time and Watermarks":

Also read the excellent series "Streaming 101" and "102", has useful animations depicting the flow of time:

Think of the watermark as a clock, ticking along due to information from the connector or from a watermark generator. 

Hope this helps,
Eron 


On Sat, Dec 16, 2017 at 8:07 AM, chen <[hidden email]> wrote:
eventTime, lateness,  maxoutoforderness are all about time.
event Time is the water mark time on the record.
lateness is record time or the real word time?
maxoutoforderness is record time or the real word time?

dataStream.keyBy(row -> (String)row.getField(0))
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
         .allowedLateness(Time.seconds(5))
         .fold(initRow(), new MyFoldFunction())

public Watermark getCurrentWatermark() {
        return new Watermark(currentTime - 5000);}

Does anyone could explain the time of eventTime,lateness,maxoutoforderness?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: flink eventTime, lateness, maxoutoforderness

Tzu-Li (Gordon) Tai
In reply to this post by chen
Hi,

lateness is record time or the real word time? 
maxoutoforderness is record time or the real word time? 

Both allow lateness of window operators, or maxOutOfOrderness of the BoundedOutOfOrdernessTimestampExtractor, refer to event time.

i.e.,
- given the end timestamp of a window is x (in event time) and allowed lateness is y, the window state is cleared only when the current watermark of the window operator passes x+y
- the BoundedOutOfOrdernessTimestampExtractor emits watermarks that lag behind the max record timestamp (in event time) by a fixed amount of time (again, in event time).

Hope this clarifies things for you!

Cheers,
Gordon

On 16 December 2017 at 8:07:15 AM, chen ([hidden email]) wrote:

eventTime, lateness, maxoutoforderness are all about time.
event Time is the water mark time on the record.
lateness is record time or the real word time?
maxoutoforderness is record time or the real word time?

dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(5))
.fold(initRow(), new MyFoldFunction())

public Watermark getCurrentWatermark() {
return new Watermark(currentTime - 5000);}

Does anyone could explain the time of eventTime,lateness,maxoutoforderness?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink eventTime, lateness, maxoutoforderness

chen
In reply to this post by Eron Wright
Hi Eron,
    Thanks for your help. Actually I know maxoutoforder, lateness is based
on Event Time. But in my test it is not. Following is my code and test data.
         "key1|1483250640000|",
         "key1|1483250636000|",
         "key1|1483250649000|",
         "key1|1483250642000|",
         "key1|1483250650000|",
         "key1|1483250641000|",
         "key1|1483250653000|",
         "key1|1483250648000|",
         "key1|1483250645000|",
         "key1|1483250658000|",
         "key1|1483250647000|",
         "key1|1483250643000|",
         "key1|1483250661000|",
         "key1|1483250662000|",
         "key1|1483250667000|",
         "key1|1483250663000|",

         dataStream.keyBy(row -> (String)row.getField(0))
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(5))
                .fold(initRow(), new FoldFunction<Row, Row>() {
                    @Override
                    public Row fold(Row ret, Row o) throws Exception {
                        ret.setField(0, (int)ret.getField(0) + 1);
                        ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
                        return  ret;
                    }
                })

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :
         1,1483250636000|
         4,1483250640000|1483250642000|1483250641000|1483250643000|
         4,1483250649000|1483250648000|1483250645000|1483250647000|
         2,1483250650000|1483250653000|
         1,1483250658000|
         3,1483250661000|1483250662000|1483250663000|
         1,1483250667000|
2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of allowedLateness, it will trigger the window to calculate again,
the result will come out again.
          1,1483250636000|
          1,1483250640000|
          2,1483250640000|1483250642000|
          1,1483250649000|
          2,1483250649000|1483250648000|
          3,1483250649000|1483250648000|1483250645000|
          2,1483250650000|1483250653000|
          1,1483250658000|
          2,1483250661000|1483250662000|
          3,1483250661000|1483250662000|1483250663000|
          1,1483250667000|




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink eventTime, lateness, maxoutoforderness

chen
In reply to this post by Eron Wright

CODE with maxOutOfOrdernesstime effect:
        dataStream.keyBy(row -> (String)row.getField(0))
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .fold(initRow(), new FoldFunction<Row, Row>() {
                    @Override
                    public Row fold(Row ret, Row o) throws Exception {
                        ret.setField(0, (int)ret.getField(0) + 1);
                        ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
                        return  ret;
                    }
                });
         public Watermark getCurrentWatermark(){
                  return new Watermark(currentTime - 5000);}

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :  
        1,1483250636000|
        4,1483250640000|1483250642000|1483250641000|1483250643000|
        4,1483250649000|1483250648000|1483250645000|1483250647000|
        2,1483250650000|1483250653000|
        1,1483250658000|
        3,1483250661000|1483250662000|1483250663000|
        1,1483250667000|

2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of maxOutOfOrdernesstime, it will delay calculate, then coming out
result.
        1,1483250636000|
        2,1483250640000|1483250642000|
        3,1483250649000|1483250648000|1483250645000|
        2,1483250650000|1483250653000|
        1,1483250658000|
        3,1483250661000|1483250662000|1483250663000|
        1,1483250667000|

I don`t know how to explain the eventTime, lateness, maxOutOfOrderness.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink eventTime, lateness, maxoutoforderness

chen
In reply to this post by Tzu-Li (Gordon) Tai
Thanks Gordon, Please see the rely. I use code, but the result it doesn`t
like what the doc explain.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/