Force triggering events on watermark

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Force triggering events on watermark

Srikanth
Hi,

I read the following in Flink doc "We can explicitly specify a Trigger to overwrite the default Trigger provided by the WindowAssigner. Note that specifying a triggers does not add an additional trigger condition but replaces the current trigger."
So, I tested out the below code with count trigger. As per my understanding this will override the default watermark based trigger.

val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428, 4),
 ("2016-04-07 13:11:59", 157428, 4),
 ("2016-04-07 13:11:59", 111283, 23),
 ("2016-04-07 13:11:57", 108042, 23),
 ("2016-04-07 13:12:00", 161374, 9),
 ("2016-04-07 13:12:00", 161374, 9),
 ("2016-04-07 13:11:59", 136505, 4)
)
)
   .assignAscendingTimestamps(b => f.parse(b._1).getTime())
           .map(b => (b._3, b._2))

testStream.print

val countStream = testStream
.keyBy(_._1)
.timeWindow(Time.seconds(20))
.trigger(CountTrigger.of(3))
.fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }

countStream.print

Output I saw confirms the documented behavior. Processing is triggered only when we have 3 elements for a key.
How do I force trigger the left over records when watermark is past the window? I.e, I want to use triggers to start early processing but finalize the window based on watermark.

Output shows that records for keys 23 & 9 weren't processed.
  (4,157428)
  (4,157428)
  (23,111283)
  (23,108042)
  (9,161374)
  (9,161374)
  (4,136505)

  (4,List(157428, 157428, 136505))

Thanks,
Srikanth
Reply | Threaded
Open this post in threaded view
|

Re: Force triggering events on watermark

Fabian Hueske-2
Maybe the last example of this blog post is helpful [1].

Best, Fabian

2016-05-10 17:24 GMT+02:00 Srikanth <[hidden email]>:
Hi,

I read the following in Flink doc "We can explicitly specify a Trigger to overwrite the default Trigger provided by the WindowAssigner. Note that specifying a triggers does not add an additional trigger condition but replaces the current trigger."
So, I tested out the below code with count trigger. As per my understanding this will override the default watermark based trigger.

val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428, 4),
 ("2016-04-07 13:11:59", 157428, 4),
 ("2016-04-07 13:11:59", 111283, 23),
 ("2016-04-07 13:11:57", 108042, 23),
 ("2016-04-07 13:12:00", 161374, 9),
 ("2016-04-07 13:12:00", 161374, 9),
 ("2016-04-07 13:11:59", 136505, 4)
)
)
   .assignAscendingTimestamps(b => f.parse(b._1).getTime())
           .map(b => (b._3, b._2))

testStream.print

val countStream = testStream
.keyBy(_._1)
.timeWindow(Time.seconds(20))
.trigger(CountTrigger.of(3))
.fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }

countStream.print

Output I saw confirms the documented behavior. Processing is triggered only when we have 3 elements for a key.
How do I force trigger the left over records when watermark is past the window? I.e, I want to use triggers to start early processing but finalize the window based on watermark.

Output shows that records for keys 23 & 9 weren't processed.
  (4,157428)
  (4,157428)
  (23,111283)
  (23,108042)
  (9,161374)
  (9,161374)
  (4,136505)

  (4,List(157428, 157428, 136505))

Thanks,
Srikanth

Reply | Threaded
Open this post in threaded view
|

Re: Force triggering events on watermark

Srikanth
Yes, will work.
I was trying another route of having a "finalize & purge trigger" that will
   i) onElement - Register for event time watermark but not alter nested trigger's TriggerResult
  ii) OnEventTime - Always purge after fire

That will work with CountTrigger and other custom trigger too rt?

public class FinalizePurgingTrigger <T, W extends Window> extends Trigger<T, W> {

@Override
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
                ctx.registerEventTimeTimer(window.getEnd)
return nestedTrigger.onElement(element, timestamp, window, ctx);
}

@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
switch (triggerResult) {
case FIRE:
return TriggerResult.FIRE_AND_PURGE;
case FIRE_AND_PURGE:
return TriggerResult.FIRE_AND_PURGE;
default:
return TriggerResult.CONTINUE;
}
}
}

Srikanth

On Tue, May 10, 2016 at 11:36 AM, Fabian Hueske <[hidden email]> wrote:
Maybe the last example of this blog post is helpful [1].

Best, Fabian

2016-05-10 17:24 GMT+02:00 Srikanth <[hidden email]>:
Hi,

I read the following in Flink doc "We can explicitly specify a Trigger to overwrite the default Trigger provided by the WindowAssigner. Note that specifying a triggers does not add an additional trigger condition but replaces the current trigger."
So, I tested out the below code with count trigger. As per my understanding this will override the default watermark based trigger.

val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428, 4),
 ("2016-04-07 13:11:59", 157428, 4),
 ("2016-04-07 13:11:59", 111283, 23),
 ("2016-04-07 13:11:57", 108042, 23),
 ("2016-04-07 13:12:00", 161374, 9),
 ("2016-04-07 13:12:00", 161374, 9),
 ("2016-04-07 13:11:59", 136505, 4)
)
)
   .assignAscendingTimestamps(b => f.parse(b._1).getTime())
           .map(b => (b._3, b._2))

testStream.print

val countStream = testStream
.keyBy(_._1)
.timeWindow(Time.seconds(20))
.trigger(CountTrigger.of(3))
.fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }

countStream.print

Output I saw confirms the documented behavior. Processing is triggered only when we have 3 elements for a key.
How do I force trigger the left over records when watermark is past the window? I.e, I want to use triggers to start early processing but finalize the window based on watermark.

Output shows that records for keys 23 & 9 weren't processed.
  (4,157428)
  (4,157428)
  (23,111283)
  (23,108042)
  (9,161374)
  (9,161374)
  (4,136505)

  (4,List(157428, 157428, 136505))

Thanks,
Srikanth


Reply | Threaded
Open this post in threaded view
|

Re: Force triggering events on watermark

Aljoscha Krettek
Yes, this should work.

On Tue, 10 May 2016 at 19:01 Srikanth <[hidden email]> wrote:
Yes, will work.
I was trying another route of having a "finalize & purge trigger" that will
   i) onElement - Register for event time watermark but not alter nested trigger's TriggerResult
  ii) OnEventTime - Always purge after fire

That will work with CountTrigger and other custom trigger too rt?

public class FinalizePurgingTrigger <T, W extends Window> extends Trigger<T, W> {

@Override
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
                ctx.registerEventTimeTimer(window.getEnd)
return nestedTrigger.onElement(element, timestamp, window, ctx);
}

@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
switch (triggerResult) {
case FIRE:
return TriggerResult.FIRE_AND_PURGE;
case FIRE_AND_PURGE:
return TriggerResult.FIRE_AND_PURGE;
default:
return TriggerResult.CONTINUE;
}
}
}

Srikanth

On Tue, May 10, 2016 at 11:36 AM, Fabian Hueske <[hidden email]> wrote:
Maybe the last example of this blog post is helpful [1].

Best, Fabian

2016-05-10 17:24 GMT+02:00 Srikanth <[hidden email]>:
Hi,

I read the following in Flink doc "We can explicitly specify a Trigger to overwrite the default Trigger provided by the WindowAssigner. Note that specifying a triggers does not add an additional trigger condition but replaces the current trigger."
So, I tested out the below code with count trigger. As per my understanding this will override the default watermark based trigger.

val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428, 4),
 ("2016-04-07 13:11:59", 157428, 4),
 ("2016-04-07 13:11:59", 111283, 23),
 ("2016-04-07 13:11:57", 108042, 23),
 ("2016-04-07 13:12:00", 161374, 9),
 ("2016-04-07 13:12:00", 161374, 9),
 ("2016-04-07 13:11:59", 136505, 4)
)
)
   .assignAscendingTimestamps(b => f.parse(b._1).getTime())
           .map(b => (b._3, b._2))

testStream.print

val countStream = testStream
.keyBy(_._1)
.timeWindow(Time.seconds(20))
.trigger(CountTrigger.of(3))
.fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }

countStream.print

Output I saw confirms the documented behavior. Processing is triggered only when we have 3 elements for a key.
How do I force trigger the left over records when watermark is past the window? I.e, I want to use triggers to start early processing but finalize the window based on watermark.

Output shows that records for keys 23 & 9 weren't processed.
  (4,157428)
  (4,157428)
  (23,111283)
  (23,108042)
  (9,161374)
  (9,161374)
  (4,136505)

  (4,List(157428, 157428, 136505))

Thanks,
Srikanth