Generate timestamps in front of event for event time windows

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Generate timestamps in front of event for event time windows

Jason Brelloch
Hey guys,

I am trying to use event time along with a custom window to capture a subset of events.  The problem I am running into is that it seems that event that generates the timestamp/watermark arrives in the window before the onEventTime() call is made that closes the window.  Example:

Window is supposed to capture 5 minutes of events after first event arrives
Event 1: timestamp 12:01 - registers event timer for 12:06
Event 2: timestamp 12:03
Event 3: timestamp 12:20 - fires and purges window

I get all three events in the window, instead of just the two the are really within the 5 minute window.

Is there someway to force the timestamp to arrive in the window before the event that generated it?

Thanks!

--
Jason Brelloch | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 
Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox
Reply | Threaded
Open this post in threaded view
|

Re: Generate timestamps in front of event for event time windows

Jason Brelloch
A little more info.  Here is a simplified version of my trigger: (windowConfiguration.timespan is the duration of the window)

class CustomTrigger extends Trigger[QualifiedEvent, Window] {

  val stateTimeDescr = new ValueStateDescriptor[Long]("relevantTimestamp", classOf[Long], 0)

  override def onElement(event: QualifiedEvent, timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = {

    val relevantTimestamp = ctx.getPartitionedState(stateTimeDescr)
    val windowConfigurationState = ctx.getPartitionedState(windowConfigDescr)
    var windowConfiguration = windowConfigurationState.value()
    if(windowConfiguration == null) {
      windowConfigurationState.update(event.alertConfiguration.window.get)
      windowConfiguration = event.alertConfiguration.window.get
    }

    if(relevantTimestamp.value() == 0) {
      ctx.registerEventTimeTimer(event.event.created.toEpochMilli + windowConfiguration.timespan.toMillis)
      relevantTimestamp.update (event.event.created.toEpochMilli + windowConfiguration.timespan.toMillis)
    }

    TriggerResult.CONTINUE
  }

  override def onEventTime(timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = {
    TriggerResult.FIRE_AND_PURGE
  }

  override def onProcessingTime(timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
}

And here is the actual window execution:

val stream = env.fromCollection(inputEvents)
        .assignAscendingTimestamps((e: QualifiedEvent) => { e.event.created.toEpochMilli })
        .keyBy((e: QualifiedEvent) => { e.alertConfiguration.alertId.toString })
        .window(GlobalWindows.create)
        .trigger(ConfigurableTrigger.create)
        .apply(new GrouperFunction).name("Grouper Function")

Oddly enough when I do this with just a basic window function it works and I only get the two events I am supposed to:

val stream = env.fromCollection(inputEvents)
        .assignAscendingTimestamps((e: QualifiedEvent) => { e.event.created.toEpochMilli })
        .keyBy((e: QualifiedEvent) => { e.alertConfiguration.alertId.toString })
        .timeWindow(Time.minutes(5))
        .apply(new GrouperFunction).name("Grouper Function")


On Wed, Aug 3, 2016 at 2:29 PM, Jason Brelloch <[hidden email]> wrote:
Hey guys,

I am trying to use event time along with a custom window to capture a subset of events.  The problem I am running into is that it seems that event that generates the timestamp/watermark arrives in the window before the onEventTime() call is made that closes the window.  Example:

Window is supposed to capture 5 minutes of events after first event arrives
Event 1: timestamp 12:01 - registers event timer for 12:06
Event 2: timestamp 12:03
Event 3: timestamp 12:20 - fires and purges window

I get all three events in the window, instead of just the two the are really within the 5 minute window.

Is there someway to force the timestamp to arrive in the window before the event that generated it?

Thanks!

--
Jason Brelloch | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 
Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox



--
Jason Brelloch | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 
Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox
Reply | Threaded
Open this post in threaded view
|

Re: Generate timestamps in front of event for event time windows

Aljoscha Krettek
Hi,
a watermark cannot be sent before the element that makes you send that watermark. A watermark of time T tells the system that no element will arrive in the future with timestamp T or less, thus you cannot send it before. It seems that what you are trying to achieve can be solved by using session windows, which will be part of the upcoming 1.1 release: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#session-windows

Cheer,
Aljoscha

On Wed, 3 Aug 2016 at 12:19 Jason Brelloch <[hidden email]> wrote:
A little more info.  Here is a simplified version of my trigger: (windowConfiguration.timespan is the duration of the window)

class CustomTrigger extends Trigger[QualifiedEvent, Window] {

  val stateTimeDescr = new ValueStateDescriptor[Long]("relevantTimestamp", classOf[Long], 0)

  override def onElement(event: QualifiedEvent, timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = {

    val relevantTimestamp = ctx.getPartitionedState(stateTimeDescr)
    val windowConfigurationState = ctx.getPartitionedState(windowConfigDescr)
    var windowConfiguration = windowConfigurationState.value()
    if(windowConfiguration == null) {
      windowConfigurationState.update(event.alertConfiguration.window.get)
      windowConfiguration = event.alertConfiguration.window.get
    }

    if(relevantTimestamp.value() == 0) {
      ctx.registerEventTimeTimer(event.event.created.toEpochMilli + windowConfiguration.timespan.toMillis)
      relevantTimestamp.update (event.event.created.toEpochMilli + windowConfiguration.timespan.toMillis)
    }

    TriggerResult.CONTINUE
  }

  override def onEventTime(timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = {
    TriggerResult.FIRE_AND_PURGE
  }

  override def onProcessingTime(timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
}

And here is the actual window execution:

val stream = env.fromCollection(inputEvents)
        .assignAscendingTimestamps((e: QualifiedEvent) => { e.event.created.toEpochMilli })
        .keyBy((e: QualifiedEvent) => { e.alertConfiguration.alertId.toString })
        .window(GlobalWindows.create)
        .trigger(ConfigurableTrigger.create)
        .apply(new GrouperFunction).name("Grouper Function")

Oddly enough when I do this with just a basic window function it works and I only get the two events I am supposed to:

val stream = env.fromCollection(inputEvents)
        .assignAscendingTimestamps((e: QualifiedEvent) => { e.event.created.toEpochMilli })
        .keyBy((e: QualifiedEvent) => { e.alertConfiguration.alertId.toString })
        .timeWindow(Time.minutes(5))
        .apply(new GrouperFunction).name("Grouper Function")


On Wed, Aug 3, 2016 at 2:29 PM, Jason Brelloch <[hidden email]> wrote:
Hey guys,

I am trying to use event time along with a custom window to capture a subset of events.  The problem I am running into is that it seems that event that generates the timestamp/watermark arrives in the window before the onEventTime() call is made that closes the window.  Example:

Window is supposed to capture 5 minutes of events after first event arrives
Event 1: timestamp 12:01 - registers event timer for 12:06
Event 2: timestamp 12:03
Event 3: timestamp 12:20 - fires and purges window

I get all three events in the window, instead of just the two the are really within the 5 minute window.

Is there someway to force the timestamp to arrive in the window before the event that generated it?

Thanks!

--
Jason Brelloch | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 
Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox



--
Jason Brelloch | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 
Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox
Reply | Threaded
Open this post in threaded view
|

Re: Generate timestamps in front of event for event time windows

Jason Brelloch
Thanks Aljoscha,

Looking forward to the 1.1. release.  I managed to solve my problem using this example code:

(courtesy of Vladimir Stoyak)

I had to create a custom window and window assigner.  Hopefully that will help someone else.

On Wed, Aug 3, 2016 at 8:35 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
a watermark cannot be sent before the element that makes you send that watermark. A watermark of time T tells the system that no element will arrive in the future with timestamp T or less, thus you cannot send it before. It seems that what you are trying to achieve can be solved by using session windows, which will be part of the upcoming 1.1 release: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#session-windows

Cheer,
Aljoscha

On Wed, 3 Aug 2016 at 12:19 Jason Brelloch <[hidden email]> wrote:
A little more info.  Here is a simplified version of my trigger: (windowConfiguration.timespan is the duration of the window)

class CustomTrigger extends Trigger[QualifiedEvent, Window] {

  val stateTimeDescr = new ValueStateDescriptor[Long]("relevantTimestamp", classOf[Long], 0)

  override def onElement(event: QualifiedEvent, timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = {

    val relevantTimestamp = ctx.getPartitionedState(stateTimeDescr)
    val windowConfigurationState = ctx.getPartitionedState(windowConfigDescr)
    var windowConfiguration = windowConfigurationState.value()
    if(windowConfiguration == null) {
      windowConfigurationState.update(event.alertConfiguration.window.get)
      windowConfiguration = event.alertConfiguration.window.get
    }

    if(relevantTimestamp.value() == 0) {
      ctx.registerEventTimeTimer(event.event.created.toEpochMilli + windowConfiguration.timespan.toMillis)
      relevantTimestamp.update (event.event.created.toEpochMilli + windowConfiguration.timespan.toMillis)
    }

    TriggerResult.CONTINUE
  }

  override def onEventTime(timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = {
    TriggerResult.FIRE_AND_PURGE
  }

  override def onProcessingTime(timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
}

And here is the actual window execution:

val stream = env.fromCollection(inputEvents)
        .assignAscendingTimestamps((e: QualifiedEvent) => { e.event.created.toEpochMilli })
        .keyBy((e: QualifiedEvent) => { e.alertConfiguration.alertId.toString })
        .window(GlobalWindows.create)
        .trigger(ConfigurableTrigger.create)
        .apply(new GrouperFunction).name("Grouper Function")

Oddly enough when I do this with just a basic window function it works and I only get the two events I am supposed to:

val stream = env.fromCollection(inputEvents)
        .assignAscendingTimestamps((e: QualifiedEvent) => { e.event.created.toEpochMilli })
        .keyBy((e: QualifiedEvent) => { e.alertConfiguration.alertId.toString })
        .timeWindow(Time.minutes(5))
        .apply(new GrouperFunction).name("Grouper Function")


On Wed, Aug 3, 2016 at 2:29 PM, Jason Brelloch <[hidden email]> wrote:
Hey guys,

I am trying to use event time along with a custom window to capture a subset of events.  The problem I am running into is that it seems that event that generates the timestamp/watermark arrives in the window before the onEventTime() call is made that closes the window.  Example:

Window is supposed to capture 5 minutes of events after first event arrives
Event 1: timestamp 12:01 - registers event timer for 12:06
Event 2: timestamp 12:03
Event 3: timestamp 12:20 - fires and purges window

I get all three events in the window, instead of just the two the are really within the 5 minute window.

Is there someway to force the timestamp to arrive in the window before the event that generated it?

Thanks!

--
Jason Brelloch | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 
Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox



--
Jason Brelloch | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 
Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox



--
Jason Brelloch | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 
Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox