Flink Session Windows State TTL

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Session Windows State TTL

karl.pullicino
Hi all, We have an Apache Flink application which generates player sessions based on player events keyed by playerId. Sessions are based on EventTime. A session is created on first event event for that player and closes if there are 30 mins of inactivity. Events are merged in our custom PlayerSessionAggregator implements AggregateFunction. We deployed this application on a Flink dev cluster (having checkpoints enabled), however we noted that the state keeps growing until we end up with an out of memory as shown in the attached file flink_oom_exception.txt We tried the using the PurgingTrigger together CountTrigger however since it uses FIRE_AND_PURGE we were ending up with a session per event i.e. event were not being merged. Using an Evictor we ended up with same situation because events were being removed from the window. Hence we resorted to using State TTL:
  • We created a StateTtlConfig having an expiry of 120 minutes to periodically remove expired sessions from the state.
  • This stateTtlConfig is passed to the flatMap PlayerSessionEventMapper extends RichFlatMapFunction.
  • The PlayerSessionEventMapper has a ValueStateDescriptor to provide access to state per player. This ValueStateDescriptor uses the previously mentioned stateTtlConfig
  • The state per player is updated on each player event. Also we enforce a state access (using ValueState.value()) since as per documentation "expired values are only removed when they are read out explicitly, e.g. by calling ValueState.value()"
This idea was based on the examples as provided in:
  • https://flink.apache.org/2019/05/19/state-ttl.html
  • https://www.ververica.com/blog/state-ttl-for-apache-flink-how-to-limit-the-lifetime-of-state
  • https://www.slideshare.net/FlinkForward/time-tolive-how-to-perform-automatic-state-cleanup-in-apache-flink-andrey-zagrebin-ververica
  • https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively
Code: PlayerSessionApp.java PlayerSessionEventMapper.java (Some custom classes have been removed for simplicity reasons)

Our questions are:
Reply | Threaded
Open this post in threaded view
|

Re: Flink Session Windows State TTL

karl.pullicino
Reply | Threaded
Open this post in threaded view
|

Re: Flink Session Windows State TTL

rmetzger0
Hey Karl,

sorry for the late reply!

Let me first quickly answer your questions:
  • are expired session windows automatically removed from state? if not, what's the best way to do it?
Yes, they should get removed automatically.
  • how can we query state size?

You can monitor the state size in the Flink web ui (there's a "Checkpointing" tab for each job)
 
  • how can we query number of windows in state?
The window operator does not expose any metrics.

I also have some questions :)
- Have you considered using the RocksDB statebackend to mitigate the out of memory issues?
- Why are you disabling the operator chaining?
- Did you validate that the "TimeZone.setDefault(...)" setting ends up at the worker JVMs executing your code? (I suspect that you are only setting the TimeZone in the JVM executing the main() method)

Best,
Robert 



On Tue, Mar 3, 2020 at 7:23 PM karl.pullicino <[hidden email]> wrote:
Added  flink_oom_exception.txt
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/flink_oom_exception.txt
as originally forgot to attach it



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Session Windows State TTL

rmetzger0
Sorry, I pressed the send button too fast.

You also attached an exception to the email, which reads as follows:
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request c28970b7cd4f68383e242703bdac81ca. Requested resource profile (ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=-1, nativeMemoryInMB=-1, networkMemoryInMB=-1, managedMemoryInMB=-1}) is unfulfillable.
This exception does not indicate that you are running out of memory at runtime, rather that your operator can not be scheduled anymore.
Can you see if enabling the operator chaining again solves the problem?

Also, how are you deploying Flink?


On Mon, Mar 9, 2020 at 3:30 PM Robert Metzger <[hidden email]> wrote:
Hey Karl,

sorry for the late reply!

Let me first quickly answer your questions:
  • are expired session windows automatically removed from state? if not, what's the best way to do it?
Yes, they should get removed automatically.
  • how can we query state size?

You can monitor the state size in the Flink web ui (there's a "Checkpointing" tab for each job)
 
  • how can we query number of windows in state?
The window operator does not expose any metrics.

I also have some questions :)
- Have you considered using the RocksDB statebackend to mitigate the out of memory issues?
- Why are you disabling the operator chaining?
- Did you validate that the "TimeZone.setDefault(...)" setting ends up at the worker JVMs executing your code? (I suspect that you are only setting the TimeZone in the JVM executing the main() method)

Best,
Robert 



On Tue, Mar 3, 2020 at 7:23 PM karl.pullicino <[hidden email]> wrote:
Added  flink_oom_exception.txt
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/flink_oom_exception.txt
as originally forgot to attach it



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/