Trigger Firing for Late Window Elements

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Trigger Firing for Late Window Elements

Scott Kidder
I'm using event-time windows of 1 hour that have an allowed lateness of several hours. This supports the processing of access logs that can be delayed by several hours. The windows aggregate data over the 1 hour period and write to a database sink. Pretty straightforward.

Will the event-time trigger lead to the window trigger firing for every single late element? Suppose thousands of late elements arrive simultaneously; I'd like to avoid having that lead to thousands of database updates in a short period of time. Ideally, I could batch up the late window changes and have it trigger when the window is finally closed or some processing-time duration passes (e.g. once per minute).

For reference, here's what the aggregate window definition looks like with Flink 1.5.3:

        chunkSource.keyBy(record -> record.getRecord().getEnvironmentId())
                .timeWindow(Time.hours(1))
                .allowedLateness(Time.hours(3))
                .aggregate(new EnvironmentAggregateWatchTimeFunction())
                .uid("env-watchtime-stats")
                .name("Env Watch-Time Stats")
                .addSink(new EnvironmentWatchTimeDBSink());


Thank you,

--
Scott Kidder
Reply | Threaded
Open this post in threaded view
|

Re: Trigger Firing for Late Window Elements

Hequn Cheng
Hi Scott,

Yes, the window trigger firing for every single late element.

If you only want the window to be triggered once, you can:
    - Remove the allowedLateness()
    - Use BoundedOutOfOrdernessTimestampExtractor to emit Watermarks that lag behind the element. 

The code(scala) looks like:
class TimestampExtractor[T1, T2]
  extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.hours(3))  {
  override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
    element._3.getTime
  }
}

Pay attention to that this will increase the latency since only trigger firing for the last element.

Best, Hequn

On Sat, Oct 20, 2018 at 1:15 AM Scott Kidder <[hidden email]> wrote:
I'm using event-time windows of 1 hour that have an allowed lateness of several hours. This supports the processing of access logs that can be delayed by several hours. The windows aggregate data over the 1 hour period and write to a database sink. Pretty straightforward.

Will the event-time trigger lead to the window trigger firing for every single late element? Suppose thousands of late elements arrive simultaneously; I'd like to avoid having that lead to thousands of database updates in a short period of time. Ideally, I could batch up the late window changes and have it trigger when the window is finally closed or some processing-time duration passes (e.g. once per minute).

For reference, here's what the aggregate window definition looks like with Flink 1.5.3:

        chunkSource.keyBy(record -> record.getRecord().getEnvironmentId())
                .timeWindow(Time.hours(1))
                .allowedLateness(Time.hours(3))
                .aggregate(new EnvironmentAggregateWatchTimeFunction())
                .uid("env-watchtime-stats")
                .name("Env Watch-Time Stats")
                .addSink(new EnvironmentWatchTimeDBSink());


Thank you,

--
Scott Kidder
Reply | Threaded
Open this post in threaded view
|

Re: Trigger Firing for Late Window Elements

Scott Kidder
That makes sense, thank you, Hequn. I can see the tradeoff between using allowedLateness on a window to trigger multiple firings, versus a window with a watermark lagging some amount of time (e.g. 3 hours) that has only a single firing.

Thanks again,

--
Scott Kidder

On Fri, Oct 19, 2018 at 7:51 PM Hequn Cheng <[hidden email]> wrote:
Hi Scott,

Yes, the window trigger firing for every single late element.

If you only want the window to be triggered once, you can:
    - Remove the allowedLateness()
    - Use BoundedOutOfOrdernessTimestampExtractor to emit Watermarks that lag behind the element. 

The code(scala) looks like:
class TimestampExtractor[T1, T2]
  extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.hours(3))  {
  override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
    element._3.getTime
  }
}

Pay attention to that this will increase the latency since only trigger firing for the last element.

Best, Hequn

On Sat, Oct 20, 2018 at 1:15 AM Scott Kidder <[hidden email]> wrote:
I'm using event-time windows of 1 hour that have an allowed lateness of several hours. This supports the processing of access logs that can be delayed by several hours. The windows aggregate data over the 1 hour period and write to a database sink. Pretty straightforward.

Will the event-time trigger lead to the window trigger firing for every single late element? Suppose thousands of late elements arrive simultaneously; I'd like to avoid having that lead to thousands of database updates in a short period of time. Ideally, I could batch up the late window changes and have it trigger when the window is finally closed or some processing-time duration passes (e.g. once per minute).

For reference, here's what the aggregate window definition looks like with Flink 1.5.3:

        chunkSource.keyBy(record -> record.getRecord().getEnvironmentId())
                .timeWindow(Time.hours(1))
                .allowedLateness(Time.hours(3))
                .aggregate(new EnvironmentAggregateWatchTimeFunction())
                .uid("env-watchtime-stats")
                .name("Env Watch-Time Stats")
                .addSink(new EnvironmentWatchTimeDBSink());


Thank you,

--
Scott Kidder