Hi,
I am a little confused about watermarkers in Flink. My application is using EventTime. My sources are calling ctx.collectWithTimestamp and ctx.emitWatermark. Then I have a CoProcessFunction which merge the two streams. I have a state on this function and I want to clean this state every time that I trigger the window of my next operator. The next operator is a join which is using a window of 1 minute [1]. stream01 = source01.connect(sideoutput02).keyBy().process(new MyCoProcessFunction); stream02 = source02.connect(sideoutput01).keyBy().process(new MyCoProcessFunction); stream01.join(stream02).window(60 sec).apply(new MyJoinFunction).print(); I am confused if I have to use env.getConfig().setAutoWatermarkInterval(60 seconds), or if I have to add .assignTimestampsAndWatermarks(new MyAssignerWithPeriodicWatermarks()) and write the logic on the method getCurrentWatermark(). In my case that I want a watermark every 60 seconds, I guess this method (getCurrentWatermark()) should have "return new Watermark(System.currentTimeMillis() + 60000);". but it should be - or +. Then, on the CoProcessFunction what is the time that I should pass on context.timerService().registerEventTimeTimer() and what is the logic that I should use in the onTimer() method? Thanks, Felipe |
What Watermarks do is to advance the event time clock. You can
consider a Watermark(t) as an assertion about the completeness of the stream -- it marks a point in the stream and says that at that point, the stream is (probably) now complete up to time t. The autoWatermarkInterval determines how often new Watermarks are created -- or in other words, how often the event-time clock will be able to move forward. From what you've presented, it seems like you can leave this at its default, which is 200 msec. This means that five times a second, as your application runs, each parallel instance will create a new watermark (assuming there's been new data and that the event time clock can be advanced). getCurrentWatermark() should NOT be implemented in terms of System.currentTimeMillis -- you do not want your watermarking to depend on the current processing time if you can possibly avoid it. Part of the beauty of event time processing is being able to run your application on historic data as well as live, real-time data, and this is only possible if your watermarks depend on timestamps recorded in the events, rather than System.currentTimeMillis. You should also try to decouple your watermarking strategy from the specific processing you intend to later, downstream. The primary concern you need to have when implementing the watermarking is to consider how much out-of-orderness your data may have. A typical timestamp assigner and watermark generator will look something like this, assuming that your event stream will have its timestamps at most 10 seconds out of order, and that your events have a timestamp field: DataStream<MyEvent> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) { @Override public long extractTimestamp(MyEvent element) { return element.timestamp; } }); As for your specific application requirements, you might find it simpler to rely on State Time-to-Live [1] rather than clearing state yourself. There's no need to retain the state until the windowed join is completed, since the operator executing the join can't access the state in the CoProcessFunction. The CoProcessFunction should clear the state whenever it is done with it; no other part of your job will access it. If there is a risk that the CoProcessFunction will create state that isn't freed, and you don't for some reason find State TTL a good solution for this, then you can use either a processing time or event time timer to trigger a call to onTimer in which you can free the state. For example, timerService.registerEventTimeTimer(event.getEventTime() + 60 * 1000); registers an event time timer for 60 seconds after the timestamp in an event -- meaning, take the event's timestamp, add 60 seconds, and wait until the current Watermark has surpassed that point in time. The Flink training website has tutorials [2] and exercises [3] on these topics. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl [2] https://training.ververica.com/lessons/event-time-watermarks.html [3] https://training.ververica.com/exercises/rideEnrichment-processfunction.html On Wed, Aug 21, 2019 at 10:59 AM Felipe Gutierrez <[hidden email]> wrote: > > Hi, > > I am a little confused about watermarkers in Flink. > > My application is using EventTime. My sources are calling ctx.collectWithTimestamp and ctx.emitWatermark. Then I have a CoProcessFunction which merge the two streams. I have a state on this function and I want to clean this state every time that I trigger the window of my next operator. The next operator is a join which is using a window of 1 minute [1]. > > stream01 = source01.connect(sideoutput02).keyBy().process(new MyCoProcessFunction); > stream02 = source02.connect(sideoutput01).keyBy().process(new MyCoProcessFunction); > stream01.join(stream02).window(60 sec).apply(new MyJoinFunction).print(); > > I am confused if I have to use env.getConfig().setAutoWatermarkInterval(60 seconds), or if I have to add .assignTimestampsAndWatermarks(new MyAssignerWithPeriodicWatermarks()) and write the logic on the method getCurrentWatermark(). In my case that I want a watermark every 60 seconds, I guess this method (getCurrentWatermark()) should have "return new Watermark(System.currentTimeMillis() + 60000);". but it should be - or +. > > Then, on the CoProcessFunction what is the time that I should pass on context.timerService().registerEventTimeTimer() and what is the logic that I should use in the onTimer() method? > > [1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/valencia/ValenciaDataSkewedBloomFilterJoinExample.java#L47 > > Thanks, > Felipe > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com |
thanks for the detail explanation! I removed my implementation of the watermark which is not necessary in my case. I will only use Watermarkers if I am dealing with out of order events. On Wed, Aug 21, 2019 at 9:09 PM David Anderson <[hidden email]> wrote: What Watermarks do is to advance the event time clock. You can |
If you want to use event time processing with in-order data, then you
can use an AscendingTimestampExtractor [1]. David [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamp_extractors.html#assigners-with-ascending-timestamps On Thu, Aug 22, 2019 at 4:03 PM Felipe Gutierrez <[hidden email]> wrote: > > thanks for the detail explanation! I removed my implementation of the watermark which is not necessary in my case. I will only use Watermarkers if I am dealing with out of order events. > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > > On Wed, Aug 21, 2019 at 9:09 PM David Anderson <[hidden email]> wrote: >> >> What Watermarks do is to advance the event time clock. You can >> consider a Watermark(t) as an assertion about the completeness of the >> stream -- it marks a point in the stream and says that at that point, >> the stream is (probably) now complete up to time t. >> >> The autoWatermarkInterval determines how often new Watermarks are >> created -- or in other words, how often the event-time clock will be >> able to move forward. From what you've presented, it seems like you >> can leave this at its default, which is 200 msec. This means that five >> times a second, as your application runs, each parallel instance will >> create a new watermark (assuming there's been new data and that the >> event time clock can be advanced). >> >> getCurrentWatermark() should NOT be implemented in terms of >> System.currentTimeMillis -- you do not want your watermarking to >> depend on the current processing time if you can possibly avoid it. >> Part of the beauty of event time processing is being able to run your >> application on historic data as well as live, real-time data, and this >> is only possible if your watermarks depend on timestamps recorded in >> the events, rather than System.currentTimeMillis. >> >> You should also try to decouple your watermarking strategy from the >> specific processing you intend to later, downstream. The primary >> concern you need to have when implementing the watermarking is to >> consider how much out-of-orderness your data may have. A typical >> timestamp assigner and watermark generator will look something like >> this, assuming that your event stream will have its timestamps at most >> 10 seconds out of order, and that your events have a timestamp field: >> >> DataStream<MyEvent> withTimestampsAndWatermarks = >> stream.assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) { >> >> @Override >> public long extractTimestamp(MyEvent element) { >> return element.timestamp; >> } >> }); >> >> As for your specific application requirements, you might find it >> simpler to rely on State Time-to-Live [1] rather than clearing state >> yourself. >> >> There's no need to retain the state until the windowed join is >> completed, since the operator executing the join can't access the >> state in the CoProcessFunction. The CoProcessFunction should clear the >> state whenever it is done with it; no other part of your job will >> access it. >> >> If there is a risk that the CoProcessFunction will create state that >> isn't freed, and you don't for some reason find State TTL a good >> solution for this, then you can use either a processing time or event >> time timer to trigger a call to onTimer in which you can free the >> state. For example, >> >> timerService.registerEventTimeTimer(event.getEventTime() + 60 * 1000); >> >> registers an event time timer for 60 seconds after the timestamp in an >> event -- meaning, take the event's timestamp, add 60 seconds, and wait >> until the current Watermark has surpassed that point in time. >> >> The Flink training website has tutorials [2] and exercises [3] on these topics. >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl >> [2] https://training.ververica.com/lessons/event-time-watermarks.html >> [3] https://training.ververica.com/exercises/rideEnrichment-processfunction.html >> >> >> On Wed, Aug 21, 2019 at 10:59 AM Felipe Gutierrez >> <[hidden email]> wrote: >> > >> > Hi, >> > >> > I am a little confused about watermarkers in Flink. >> > >> > My application is using EventTime. My sources are calling ctx.collectWithTimestamp and ctx.emitWatermark. Then I have a CoProcessFunction which merge the two streams. I have a state on this function and I want to clean this state every time that I trigger the window of my next operator. The next operator is a join which is using a window of 1 minute [1]. >> > >> > stream01 = source01.connect(sideoutput02).keyBy().process(new MyCoProcessFunction); >> > stream02 = source02.connect(sideoutput01).keyBy().process(new MyCoProcessFunction); >> > stream01.join(stream02).window(60 sec).apply(new MyJoinFunction).print(); >> > >> > I am confused if I have to use env.getConfig().setAutoWatermarkInterval(60 seconds), or if I have to add .assignTimestampsAndWatermarks(new MyAssignerWithPeriodicWatermarks()) and write the logic on the method getCurrentWatermark(). In my case that I want a watermark every 60 seconds, I guess this method (getCurrentWatermark()) should have "return new Watermark(System.currentTimeMillis() + 60000);". but it should be - or +. >> > >> > Then, on the CoProcessFunction what is the time that I should pass on context.timerService().registerEventTimeTimer() and what is the logic that I should use in the onTimer() method? >> > >> > [1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/valencia/ValenciaDataSkewedBloomFilterJoinExample.java#L47 >> > >> > Thanks, >> > Felipe >> > -- >> > -- Felipe Gutierrez >> > -- skype: felipe.o.gutierrez >> > -- https://felipeogutierrez.blogspot.com |
Free forum by Nabble | Edit this page |