[DISCUSS] Changing Window Cleanup Semantics

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Changing Window Cleanup Semantics

Aljoscha Krettek
Hi,
I recently created https://issues.apache.org/jira/browse/FLINK-4994 to address what I think is a flaw in the window cleanup semantics. This has the possibility of affecting people so I'd like to get some opinions and also give people a heads-up.

Before going into what I'm proposing in the issue, let's first see how window cleanup works in Flink 1.1. There are three pieces of state that need to be cleaned up: 1) window contents (the actual elements) 2) merging window set (only in case of merging windows) and 3) trigger state. The WindowOperator is responsible for cleaning the first two while the Trigger itself is responsible for the third. For this purpose we have the method Trigger.clear() that must clean up all state the Trigger created.

In Flink 1.1 we clear 1 and 2 and call Trigger.clear() when either of two things happens: a Trigger returns PURGE (or FIRE_AND_PURGE, below I'll only say PURGE but mean both) or the allowed lateness expires (watermark passes the end of the window plus allowed lateness). This will lead to surprising results when you use session windows (as outlined in the linked issue) and people also seem to have been working around the fact that Trigger.clear() is called when PURGING because they want to clear the window contents but still keep some state in the trigger in case more elements arrive.

For Flink 1.2 I'm proposing to not clear the merging window set on PURGE and also to not call Trigger.clear() on PURGE. Those two should only happen when we garbage collect the window, i.e. we reach the end of the allowed lateness. This will have the consequence that people who have been relying on cleanup to happen after a PURGE have to manually cleanup state now. I think it's still a good change, though, since it leads to more sane behaviour for merging windows.

What do you think?

There is also the possibility of adding a Trigger.onPurge() method that by default calls clean(), but in the long run I'd like to remove that again so at one point people have to adapt to the changes. In my opinion it's better to break this now and cleanly.

Cheers,
Aljoscha