State in the Scala DataStream API

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

State in the Scala DataStream API

Juan Gentile

Hello,


I'm looking at the following page of the documentation

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html

particularly at this piece of code:


val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

How is the state clear/purge in this case for keys that no longer appear?


Thank you,

Juan

Reply | Threaded
Open this post in threaded view
|

Re: State in the Scala DataStream API

Fabian Hueske-2
Hi Juan,

The state will be purged if you return None instead of a Some.
However, this only happens when the function is called for a specific key, i.e., state won't be automatically removed after some time.
If this is your use case, you have to implement a ProcessFunction and use timers to manually clean up the state.

Best, Fabian

2018-08-08 19:02 GMT+02:00 Juan Gentile <[hidden email]>:

Hello,


I'm looking at the following pageĀ of the documentation

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html

particularly at this piece of code:


val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

How is the state clear/purge in this case for keys that no longer appear?


Thank you,

Juan