|
Hi,
I'm using EventTimeSessionWindows to create my session windows, and then I print the individual events using the apply function on the resultant WindowedStream.
I noticed that when I use
.window(EventTimeSessionWindows.withGap(Time.seconds(1))), I am able to process all my events, but when I use .window(EventTimeSessionWindows.withGap(Time.seconds(5))), I lose a few events (3-4). When I further increase the time gap to 30 seconds, I end up processing even fewer events (~60 fewer events are printed).
What can be the reason for this and how can I solve it?
This is a snippet of my main program -
val keyed = stream.map(e => (e.getHeader.getChannelSessionId, e))
.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(30)))
.apply { (
csId: String,
window: TimeWindow,
events: Iterable[(String, Event)],
out: Collector[String]) => {
out.collect(sessionEventsProcessor.processEvents(events)) //I print the events from the iterable here
}
}
.print()
Thank you,
Aditi
|