Hey guys,
I am trying to use event time along with a custom window to capture a subset of events. The problem I am running into is that it seems that event that generates the timestamp/watermark arrives in the window before the onEventTime() call is made that closes the window. Example: Window is supposed to capture 5 minutes of events after first event arrives Event 1: timestamp 12:01 - registers event timer for 12:06 Event 2: timestamp 12:03Event 3: timestamp 12:20 - fires and purges window I get all three events in the window, instead of just the two the are really within the 5 minute window. Is there someway to force the timestamp to arrive in the window before the event that generated it? Thanks! Jason Brelloch | Product Developer Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox |
A little more info. Here is a simplified version of my trigger: (windowConfiguration.timespan is the duration of the window) class CustomTrigger extends Trigger[QualifiedEvent, Window] { val stateTimeDescr = new ValueStateDescriptor[Long]("relevantTimestamp", classOf[Long], 0) override def onElement(event: QualifiedEvent, timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = { val relevantTimestamp = ctx.getPartitionedState(stateTimeDescr) val windowConfigurationState = ctx.getPartitionedState(windowConfigDescr) var windowConfiguration = windowConfigurationState.value() if(windowConfiguration == null) { windowConfigurationState.update(event.alertConfiguration.window.get) windowConfiguration = event.alertConfiguration.window.get } if(relevantTimestamp.value() == 0) { ctx.registerEventTimeTimer(event.event.created.toEpochMilli + windowConfiguration.timespan.toMillis) relevantTimestamp.update (event.event.created.toEpochMilli + windowConfiguration.timespan.toMillis) } TriggerResult.CONTINUE } override def onEventTime(timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = { TriggerResult.FIRE_AND_PURGE } override def onProcessingTime(timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = { TriggerResult.CONTINUE } } And here is the actual window execution: val stream = env.fromCollection(inputEvents) .assignAscendingTimestamps((e: QualifiedEvent) => { e.event.created.toEpochMilli }) .keyBy((e: QualifiedEvent) => { e.alertConfiguration.alertId.toString }) .window(GlobalWindows.create) .trigger(ConfigurableTrigger.create) .apply(new GrouperFunction).name("Grouper Function") Oddly enough when I do this with just a basic window function it works and I only get the two events I am supposed to: val stream = env.fromCollection(inputEvents) .assignAscendingTimestamps((e: QualifiedEvent) => { e.event.created.toEpochMilli }) .keyBy((e: QualifiedEvent) => { e.alertConfiguration.alertId.toString }) .timeWindow(Time.minutes(5)) .apply(new GrouperFunction).name("Grouper Function") On Wed, Aug 3, 2016 at 2:29 PM, Jason Brelloch <[hidden email]> wrote:
Jason Brelloch | Product Developer Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox |
Hi, a watermark cannot be sent before the element that makes you send that watermark. A watermark of time T tells the system that no element will arrive in the future with timestamp T or less, thus you cannot send it before. It seems that what you are trying to achieve can be solved by using session windows, which will be part of the upcoming 1.1 release: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#session-windows Cheer, Aljoscha On Wed, 3 Aug 2016 at 12:19 Jason Brelloch <[hidden email]> wrote:
|
Thanks Aljoscha, Looking forward to the 1.1. release. I managed to solve my problem using this example code: (courtesy of Vladimir Stoyak) I had to create a custom window and window assigner. Hopefully that will help someone else. On Wed, Aug 3, 2016 at 8:35 PM, Aljoscha Krettek <[hidden email]> wrote:
Jason Brelloch | Product Developer Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox |
Free forum by Nabble | Edit this page |