Periodically evicting internal states when using mapWithState()

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Periodically evicting internal states when using mapWithState()

Jack Huang-2
Hi all,

I have an incoming stream of event objects, each with its session ID. I am writing a task that aggregate the events by session. The general logics looks like

case class Event(sessionId:Int, data:String)
case class Session(id:Int, var events:List[Event])

val events = ... //some source
events.
.keyBy((event:Event) => event.sessionId)
.mapWithState((event:Event, state:Option[Session]) => {
    val session = state.getOrElse(Session(id=event.session_id, events=List()))
    session.event = session.event :+ event
    (session, Some(session))    
})
The problem is that there is no reliable way of knowing the end of a session, since events are likely to get lost. If I keep this process running, the number of stored sessions will keep growing until it fills up the disk. 

Is there a recommended way of periodically evicting sessions that are too old (e.g. a day old)?  



Thanks,
Jack
Reply | Threaded
Open this post in threaded view
|

Re: Periodically evicting internal states when using mapWithState()

Aljoscha Krettek
Hi Jack,
right now this is not possible except when writing a custom operator. We are working on support for a time-to-live setting on states, this should solve your problem.

For writing a custom operator, check out DataStream.transform() and StreamMap, which is the operator implementation for Map. Please let me know if you have any further questions.

Best,
Aljoscha

On Tue, 7 Jun 2016 at 03:05 Jack Huang <[hidden email]> wrote:
Hi all,

I have an incoming stream of event objects, each with its session ID. I am writing a task that aggregate the events by session. The general logics looks like

case class Event(sessionId:Int, data:String)
case class Session(id:Int, var events:List[Event])

val events = ... //some source
events.
.keyBy((event:Event) => event.sessionId)
.mapWithState((event:Event, state:Option[Session]) => {
    val session = state.getOrElse(Session(id=event.session_id, events=List()))
    session.event = session.event :+ event
    (session, Some(session))    
})
The problem is that there is no reliable way of knowing the end of a session, since events are likely to get lost. If I keep this process running, the number of stored sessions will keep growing until it fills up the disk. 

Is there a recommended way of periodically evicting sessions that are too old (e.g. a day old)?  



Thanks,
Jack