http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/DISCUSS-Allowed-Lateness-in-Flink-tp5911.html
Hi Folks,
as part of my effort to improve the windowing in Flink [1] I also thought about lateness, accumulating/discarding and window cleanup. I have some ideas on this but I would love to get feedback from the community as I think that these things are important for everyone doing event-time windowing on Flink.
The basic problem is this: Some elements can arrive behind the watermark if the watermark is not 100 % correct (which it is not, in most cases, I would assume). We need to provide API that allows to specify what happens when these late elements arrive. There are two main knobs for the user here:
- Allowed Lateness: How late can an element be before it is completely ignored, i.e. simply discarded
- Accumulating/Discarding Fired Windows: When we fire a window, do we purge the contents or do we keep it around until the watermark passes the end of end window plus the allowed lateness? If we keep the window a late element will be added to the window and the window will be emitted again. If don't keep the window then the late element will essentially trigger emission of a one-element window.
This is somewhat straightforward to implement: If accumulating set a timer for the end of the window plus the allowed lateness. Cleanup the window when that fires (basically). All in event-time with watermarks.
My problem is only this: what should happen if the user specifies some allowed lateness and/or accumulating mode but uses processing-time windowing. For processing-time windows these don't make sense because elements cannot can be late by definition. The problem is, that we cannot figure out, by looking at a WindowAssigner or the Windows that it assigns to elements whether these windows are in event-time or processing-time domain. At the API level this is also not easily visible, since a user might have set the "stream-time-characteristic" to event-time but still use a processing-time window (plus trigger) in the program.
Any ideas for solving this are extremely welcome. :-)
Cheers,
Aljoscha