Is there a way to find the age of an element in a Global window?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Is there a way to find the age of an element in a Global window?

HarshithBolar

Hi all,

 

I'm using Global Windows for my application with a custom trigger and custom evictor based on some conditions. Now, I also want to evict those elements from the window that have stayed there for too long, let's say 30 mins. How would I go about doing this? Is there a utility that Flink provides that lets me know what the age of an element in a window is?

 

Thanks,

Harshith

 

Reply | Threaded
Open this post in threaded view
|

Re: Is there a way to find the age of an element in a Global window?

Kostas Kloudas-3
Hi Harshith,

The evictor has 2 methods: 
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

In the iterables, you have access to the elements and their timestamps, and the evictor context gives you access to the current watermark
and current processing time.

Based on this information, you can call remove on the iterator created by the iterable and clean up the elements that you want to remove.
If you operate on event time, and you want to clean up base on processing time, then you can put a processFunction or a map before 
you window operator, put the System.currentTimeMillis in the record itself, and the use the evictor and the currentProcessing time to clean up.

I hope this helps,
Kostas


On Fri, Jan 18, 2019 at 9:25 AM Kumar Bolar, Harshith <[hidden email]> wrote:

Hi all,

 

I'm using Global Windows for my application with a custom trigger and custom evictor based on some conditions. Now, I also want to evict those elements from the window that have stayed there for too long, let's say 30 mins. How would I go about doing this? Is there a utility that Flink provides that lets me know what the age of an element in a window is?

 

Thanks,

Harshith

 

Reply | Threaded
Open this post in threaded view
|

Re: Re: Is there a way to find the age of an element in a Global window?

HarshithBolar

Thanks. That makes sense :)

 

From: Kostas Kloudas <[hidden email]>
Date: Friday, 18 January 2019 at 8:25 PM
To: Harshith Kumar Bolar <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: [External] Re: Is there a way to find the age of an element in a Global window?

 

Hi Harshith,

 

The evictor has 2 methods: 

void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

 

In the iterables, you have access to the elements and their timestamps, and the evictor context gives you access to the current watermark

and current processing time.

 

Based on this information, you can call remove on the iterator created by the iterable and clean up the elements that you want to remove.

If you operate on event time, and you want to clean up base on processing time, then you can put a processFunction or a map before 

you window operator, put the System.currentTimeMillis in the record itself, and the use the evictor and the currentProcessing time to clean up.

 

I hope this helps,

Kostas

 

 

On Fri, Jan 18, 2019 at 9:25 AM Kumar Bolar, Harshith <[hidden email]> wrote:

Hi all,

 

I'm using Global Windows for my application with a custom trigger and custom evictor based on some conditions. Now, I also want to evict those elements from the window that have stayed there for too long, let's say 30 mins. How would I go about doing this? Is there a utility that Flink provides that lets me know what the age of an element in a window is?

 

Thanks,

Harshith

 

Reply | Threaded
Open this post in threaded view
|

Re: Re: Is there a way to find the age of an element in a Global window?

Manjusha Vuyyuru
Hi Kostas,

I have a similar scenario where i have to clear window elements upon reaching some count or clear the elements if they are older than one hour.
I'm using the below approach, just wanted to know if its the right way :

 DataStream<Tuple4<String, String, Integer, String>> out = mappedFields
        .map(new CustomMapFunction())
        .keyBy(0,1)
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(new CustomCountTrigger()))
        .evictor(TimeEvictor.of(Time.seconds(3600), true))
        .apply(new CustomWindowFunction());

TimeEvictor.of(Time.seconds(3600), true) - evicting after window function is evaluated.

Please help.

Thanks,
Manju


On Fri, Jan 18, 2019 at 8:28 PM Kumar Bolar, Harshith <[hidden email]> wrote:

Thanks. That makes sense :)

 

From: Kostas Kloudas <[hidden email]>
Date: Friday, 18 January 2019 at 8:25 PM
To: Harshith Kumar Bolar <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: [External] Re: Is there a way to find the age of an element in a Global window?

 

Hi Harshith,

 

The evictor has 2 methods: 

void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

 

In the iterables, you have access to the elements and their timestamps, and the evictor context gives you access to the current watermark

and current processing time.

 

Based on this information, you can call remove on the iterator created by the iterable and clean up the elements that you want to remove.

If you operate on event time, and you want to clean up base on processing time, then you can put a processFunction or a map before 

you window operator, put the System.currentTimeMillis in the record itself, and the use the evictor and the currentProcessing time to clean up.

 

I hope this helps,

Kostas

 

 

On Fri, Jan 18, 2019 at 9:25 AM Kumar Bolar, Harshith <[hidden email]> wrote:

Hi all,

 

I'm using Global Windows for my application with a custom trigger and custom evictor based on some conditions. Now, I also want to evict those elements from the window that have stayed there for too long, let's say 30 mins. How would I go about doing this? Is there a utility that Flink provides that lets me know what the age of an element in a window is?

 

Thanks,

Harshith