I'm using event-time windows of 1 hour that have an allowed lateness of several hours. This supports the processing of access logs that can be delayed by several hours. The windows aggregate data over the 1 hour period and write to a database sink. Pretty straightforward. Will the event-time trigger lead to the window trigger firing for every single late element? Suppose thousands of late elements arrive simultaneously; I'd like to avoid having that lead to thousands of database updates in a short period of time. Ideally, I could batch up the late window changes and have it trigger when the window is finally closed or some processing-time duration passes (e.g. once per minute). For reference, here's what the aggregate window definition looks like with Flink 1.5.3: chunkSource.keyBy(record -> record.getRecord().getEnvironmentId()) .timeWindow(Time.hours(1)) .allowedLateness(Time.hours(3)) .aggregate(new EnvironmentAggregateWatchTimeFunction()) .uid("env-watchtime-stats") .name("Env Watch-Time Stats") .addSink(new EnvironmentWatchTimeDBSink()); Thank you, -- Scott Kidder |
Hi Scott, Yes, the window trigger firing for every single late element. If you only want the window to be triggered once, you can: - Remove the allowedLateness() - Use BoundedOutOfOrdernessTimestampExtractor to emit Watermarks that lag behind the element. The code(scala) looks like: class TimestampExtractor[T1, T2] Pay attention to that this will increase the latency since only trigger firing for the last element. Best, Hequn On Sat, Oct 20, 2018 at 1:15 AM Scott Kidder <[hidden email]> wrote:
|
That makes sense, thank you, Hequn. I can see the tradeoff between using allowedLateness on a window to trigger multiple firings, versus a window with a watermark lagging some amount of time (e.g. 3 hours) that has only a single firing. Thanks again, -- Scott Kidder On Fri, Oct 19, 2018 at 7:51 PM Hequn Cheng <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |