Hi all, I'm using Global Windows for my application with a custom trigger and custom evictor based on some conditions. Now, I also want to evict those elements from the window that have stayed there for too long,
let's say 30 mins. How would I go about doing this? Is there a utility that Flink provides that lets me know what the age of an element in a window is? Thanks, Harshith |
Hi Harshith, The evictor has 2 methods: void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); In the iterables, you have access to the elements and their timestamps, and the evictor context gives you access to the current watermark and current processing time. Based on this information, you can call remove on the iterator created by the iterable and clean up the elements that you want to remove. If you operate on event time, and you want to clean up base on processing time, then you can put a processFunction or a map before you window operator, put the System.currentTimeMillis in the record itself, and the use the evictor and the currentProcessing time to clean up. I hope this helps, Kostas On Fri, Jan 18, 2019 at 9:25 AM Kumar Bolar, Harshith <[hidden email]> wrote:
|
Thanks. That makes sense :) From: Kostas Kloudas <[hidden email]> Hi Harshith, The evictor has 2 methods: void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); In the iterables, you have access to the elements and their timestamps, and the evictor context gives you access to the current watermark and current processing time. Based on this information, you can call remove on the iterator created by the iterable and clean up the elements that you want to remove. If you operate on event time, and you want to clean up base on processing time, then you can put a processFunction or a map before you window operator, put the System.currentTimeMillis in the record itself, and the use the evictor and the currentProcessing time to clean up. I hope this helps, Kostas On Fri, Jan 18, 2019 at 9:25 AM Kumar Bolar, Harshith <[hidden email]> wrote:
|
Hi Kostas, I have a similar scenario where i have to clear window elements upon reaching some count or clear the elements if they are older than one hour. I'm using the below approach, just wanted to know if its the right way : DataStream<Tuple4<String, String, Integer, String>> out = mappedFields .map(new CustomMapFunction()) .keyBy(0,1) .window(GlobalWindows.create()) .trigger(PurgingTrigger.of(new CustomCountTrigger())) .evictor(TimeEvictor.of(Time.seconds(3600), true)) .apply(new CustomWindowFunction()); TimeEvictor.of(Time.seconds(3600), true) - evicting after window function is evaluated. Please help. Thanks, Manju On Fri, Jan 18, 2019 at 8:28 PM Kumar Bolar, Harshith <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |