Time To Live-Setting for State/StateDescriptor

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Time To Live-Setting for State/StateDescriptor

Julian Bauß
Hi everybody,

does Flink offer any automated TTL-functionality for operator state?
My current solution for deleting state after some time is using a SessionWindow which triggers after a timeout and sends messages to stateful operators which then call the clear() method on their state.

This is my solution in pseudocode:

sessionEndNotifications = dataStream.window(EventTimeSessionWindows.withGap(Time.hours(1)).apply(new StateInvalidatorFunction).keyBy(...);

dataStream.connect(sessionEndNotifications)
          .flatMap(new StatefulFlatMapFunction())
          .addSink(...);

The StatefulFlatMapFunction calls state.clear() in its flatMap2 function when it receives a sessionEndNotification. The flatMap1 function contains the actual business logic which is being calculated per event.

I feel like I'm doing this in a roundabout way. Are there better ways of doing this?

Best Regards,

Julian
Reply | Threaded
Open this post in threaded view
|

Re: Time To Live-Setting for State/StateDescriptor

Aljoscha Krettek
Hi Julian,
we're aware of the issue (https://issues.apache.org/jira/browse/FLINK-3946) but unfortunately it's not implemented yet. I recently added ProcessFunction which is somewhat similar to a flatmap but also allows setting timers. This can be used to set a TTL timer and then call clear() on the state when the timer fires. This will be release in Flink 1.2.

Cheers,
Aljoscha

On Wed, 30 Nov 2016 at 17:13 Julian Bauß <[hidden email]> wrote:
Hi everybody,

does Flink offer any automated TTL-functionality for operator state?
My current solution for deleting state after some time is using a SessionWindow which triggers after a timeout and sends messages to stateful operators which then call the clear() method on their state.

This is my solution in pseudocode:

sessionEndNotifications = dataStream.window(EventTimeSessionWindows.withGap(Time.hours(1)).apply(new StateInvalidatorFunction).keyBy(...);

dataStream.connect(sessionEndNotifications)
          .flatMap(new StatefulFlatMapFunction())
          .addSink(...);

The StatefulFlatMapFunction calls state.clear() in its flatMap2 function when it receives a sessionEndNotification. The flatMap1 function contains the actual business logic which is being calculated per event.

I feel like I'm doing this in a roundabout way. Are there better ways of doing this?

Best Regards,

Julian
Reply | Threaded
Open this post in threaded view
|

AW: Time To Live-Setting for State/StateDescriptor

Bauss, Julian

Hi Aljoscha,

 

thanks for your reply.

It seems like 1.2 will be a very awesome release.

I’m looking forward to it :)

 

Best Regards,

Julian

 

Von: Aljoscha Krettek [mailto:[hidden email]]
Gesendet: Donnerstag, 1. Dezember 2016 12:41
An: [hidden email]
Betreff: Re: Time To Live-Setting for State/StateDescriptor

 

Hi Julian,

we're aware of the issue (https://issues.apache.org/jira/browse/FLINK-3946) but unfortunately it's not implemented yet. I recently added ProcessFunction which is somewhat similar to a flatmap but also allows setting timers. This can be used to set a TTL timer and then call clear() on the state when the timer fires. This will be release in Flink 1.2.

 

Cheers,

Aljoscha

 

On Wed, 30 Nov 2016 at 17:13 Julian Bauß <[hidden email]> wrote:

Hi everybody,

 

does Flink offer any automated TTL-functionality for operator state?

My current solution for deleting state after some time is using a SessionWindow which triggers after a timeout and sends messages to stateful operators which then call the clear() method on their state.

 

This is my solution in pseudocode:

 

sessionEndNotifications = dataStream.window(EventTimeSessionWindows.withGap(Time.hours(1)).apply(new StateInvalidatorFunction).keyBy(...);

 

dataStream.connect(sessionEndNotifications)

          .flatMap(new StatefulFlatMapFunction())

          .addSink(...);

 

The StatefulFlatMapFunction calls state.clear() in its flatMap2 function when it receives a sessionEndNotification. The flatMap1 function contains the actual business logic which is being calculated per event.

 

I feel like I'm doing this in a roundabout way. Are there better ways of doing this?

 

Best Regards,

 

Julian



**************************************************************************************************************

bonprix Handelsgesellschaft mbH
Sitz der Gesellschaft: Hamburg

Geschäftsführung:
Dr. Marcus Ackermann (Vorsitzender)
Markus Fuchshofen
Dr. Richard Gottwald
Dr. Kai Heck
Rien Jansen
Beiratsvorsitzender: Alexander Birken

Handelsregister AG Hamburg HR B 36 455

Adresse:

bonprix Handelsgesellschaft mbH

Haldesdorfer Str. 61
22179 Hamburg

Diese E-Mail enthält vertrauliche und/oder rechtlich geschützte Informationen.
Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
informieren Sie bitte sofort den Absender und vernichten Sie diese Mail.
Das unerlaubte Kopieren sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet.

This e-mail may contain confidential and/or privileged information.
If you are not the intended recipient (or have received the e-mail in error)
please notify the sender immediately and delete this e-mail. Any unauthorized copying,
disclosure or distribution of the material in this e-mail is strictly forbidden.

**************************************************************************************************************