Hi All,
I have a use case where in I'm supposed to work with Session Windows to maintain some values for some sessionIDs/keys. The use case is as follows: I need to maintain a session window for the incoming data and discard the window after some set gap/period of inactivity but what I want is that as soon as new element gets added to the window, all the records that are currently in the window get processed using the window transformation/function and the window does not get discarded. The "Session windows implementation" as get processed only after the window is consider complete(based on some gap time settings). But I wish to process the all the elements contained in the window as soon as a new element gets added to the window(means addition of a new element triggers the processing of all elements of the window) but the discarding of the window happens only if there is a gap/inactivity for some set time. And when the window gets discarded/expires I don't want it to be re-evaluated, since it's contents were processed when the last element was added to the window. Is this implementation possible? If yes, can someone please share some sample code to explain the implementation. Thank you! Regards, Anchit |
Hi Anchit, I think you need a customized EventTimeTrigger which returns "TriggerResult.FIRE" both on new element and watermark. Thanks, Manu Zhang On Fri, Oct 21, 2016 at 3:08 PM Anchit Jatana <[hidden email]> wrote:
|
Here is a session trigger that I wrote (not quite the same rules around what a
session is, but should hopefully be a good start to work from). I'd love to get any feedback on how it could be improved. - bart import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor} Van: Manu Zhang [[hidden email]]
Verzonden: vrijdag 21 oktober 2016 10:52 Aan: [hidden email] Onderwerp: Re: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity Hi Anchit,
I think you need a customized EventTimeTrigger which returns "TriggerResult.FIRE"
both on new element and watermark.
Thanks,
Manu Zhang
On Fri, Oct 21, 2016 at 3:08 PM Anchit Jatana <[hidden email]> wrote:
|
Hi Bart,
Thank you so much for sharing the approach. Looks like this solved my problem. Here's what I have as an implementation for my use-case: package org.apache.flink.quickstart import org.apache.flink.api.common.state.{ ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor } import org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext import org.apache.flink.streaming.api.windowing.triggers.{ Trigger, TriggerResult } import org.apache.flink.streaming.api.windowing.windows.Window import org.slf4j.LoggerFactory class sessionTrigger[E](val sessionPauseHours: Long) extends Trigger[E, Window] { val timeState = new ValueStateDescriptor[Option[Long]]("sessionTimer", classOf[Option[Long]], None) override def onElement(t: E, l: Long, w: Window, triggerContext: TriggerContext): TriggerResult = { // remove old timer val time_state: ValueState[Option[Long]] = triggerContext.getPartitionedState(timeState) val time_set = time_state.value() if (time_set.isDefined) { triggerContext.deleteProcessingTimeTimer(time_set.get) } // set new time and continue val new_time = triggerContext.getCurrentProcessingTime + Time.seconds(sessionPauseHours).toMilliseconds() time_state.update(Some(new_time)) triggerContext.registerProcessingTimeTimer(new_time) TriggerResult.FIRE } override def onProcessingTime(l: Long, w: Window, triggerContext: TriggerContext): TriggerResult = { TriggerResult.PURGE } override def onEventTime(l: Long, w: Window, triggerContext: TriggerContext): TriggerResult = { TriggerResult.CONTINUE } } Regards, Anchit |
Hi Bart, are you using your custom Trigger together with a merging session window assigner? You might want to consider overriding the clear() method in your trigger to clean up the state that you use. If you don't you might run into memory leaks because the state is never cleaned up. Cheers, Aljoscha On Sat, 22 Oct 2016 at 07:06 Anchit Jatana <[hidden email]> wrote: Hi Bart, |
Hi Aljoscha,
I am using the custom trigger with GlobalWindows window assigner. Do I still need to override clear method and delete the ProcessingTimeTimer using- triggerContext.deleteProcessingTimeTimer(prevTime)? Regards, Anchit |
Hi Anchit, the timers don't necessarily have to be cleaned up. So you should be good to go. Cheers, Aljoscha On Fri, 28 Oct 2016 at 23:33 Anchit Jatana <[hidden email]> wrote: Hi Aljoscha, |
Free forum by Nabble | Edit this page |