Re: Trigger Firing for Late Window Elements

Posted by Hequn Cheng on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Trigger-Firing-for-Late-Window-Elements-tp23971p23976.html

Hi Scott,

Yes, the window trigger firing for every single late element.

If you only want the window to be triggered once, you can:
    - Remove the allowedLateness()
    - Use BoundedOutOfOrdernessTimestampExtractor to emit Watermarks that lag behind the element. 

The code(scala) looks like:
class TimestampExtractor[T1, T2]
  extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.hours(3))  {
  override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
    element._3.getTime
  }
}

Pay attention to that this will increase the latency since only trigger firing for the last element.

Best, Hequn

On Sat, Oct 20, 2018 at 1:15 AM Scott Kidder <[hidden email]> wrote:
I'm using event-time windows of 1 hour that have an allowed lateness of several hours. This supports the processing of access logs that can be delayed by several hours. The windows aggregate data over the 1 hour period and write to a database sink. Pretty straightforward.

Will the event-time trigger lead to the window trigger firing for every single late element? Suppose thousands of late elements arrive simultaneously; I'd like to avoid having that lead to thousands of database updates in a short period of time. Ideally, I could batch up the late window changes and have it trigger when the window is finally closed or some processing-time duration passes (e.g. once per minute).

For reference, here's what the aggregate window definition looks like with Flink 1.5.3:

        chunkSource.keyBy(record -> record.getRecord().getEnvironmentId())
                .timeWindow(Time.hours(1))
                .allowedLateness(Time.hours(3))
                .aggregate(new EnvironmentAggregateWatchTimeFunction())
                .uid("env-watchtime-stats")
                .name("Env Watch-Time Stats")
                .addSink(new EnvironmentWatchTimeDBSink());


Thank you,

--
Scott Kidder