Hi all,
I have an incoming stream of event objects, each with its session ID. I am writing a task that aggregate the events by session. The general logics looks like
case class Event(sessionId:Int, data:String)
case class Session(id:Int, var events:List[Event])
val events = ... //some source
events.
.keyBy((event:Event) => event.sessionId)
.mapWithState((event:Event, state:Option[Session]) => {
val session = state.getOrElse(Session(id=event.session_id, events=List()))
session.event = session.event :+ event
(session, Some(session))
})
The problem is that there is no reliable way of knowing the end of a session, since events are likely to get lost. If I keep this process running, the number of stored sessions will keep growing until it fills up the disk.
Is there a recommended way of periodically evicting sessions that are too old (e.g. a day old)?
Thanks,
Jack