Hi,
I was recently experimenting with savepoints and various situations in which they succeed or fail. I expected this example to fail: Basically, the first program runs with a count window. The second program is identical except that it uses a time window instead of a count window. From what I've observed, most of the time when Flink can't successfully restore a checkpoint it throws an exception saying as much. I was expecting to see that behavior here. Could someone explain why this "works" (as in, flink accepts the program with the savepoint from the first version of the program), and if this is a bug? |
"Flink can't successfully restore a checkpoint" should be "Flink can't successfully restore a savepoint". On Thu, May 12, 2016 at 3:44 PM, Andrew Whitaker <[hidden email]> wrote:
Andrew Whitaker | [hidden email] -- Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent. |
In reply to this post by Andrew Whitaker
On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker
<[hidden email]> wrote: > From what I've observed, most of the time when Flink can't successfully > restore a checkpoint it throws an exception saying as much. I was expecting > to see that behavior here. Could someone explain why this "works" (as in, > flink accepts the program with the savepoint from the first version of the > program), and if this is a bug? Hey Andrew! Thanks for reporting this. Flink generates operator IDs and uses these to map the state back to the same operator when restoring from a savepoint. We want these IDs to stay the same as long as the program does not change. The ID can either be generated automatically by Flink or manually by the user. The automatically generated ID is based on certain topology attributes like parallelism, operator placement, etc. If the attribute changes, the operator ID changes and you can't map the savepoint state back. If it stays the same, we assume that the program has not changed. The problem in your example is that to Flink both programs look the same with respect to how the IDs are generated: the topology didn't change and both the time and count window are executed by the WindowOperator with an InternalWindowFunction. The recommended way to work with savepoints is to skip the automatic IDs altogether and assign the IDs manually instead. You can do this via the "uid(String)" method of each operator, which gives you fine-grained control over the "versioning" of state: env.addSource(..).uid("my-source") vs. env.addSource(..).uid("my-source-2") The problem I've just noticed is that you can't specify this on WindowedStreams, but only on DataStreams, which is clearly a bug. Furthermore, it might be a good idea to special case windows when automatically generating the IDs. I hope this helps a little with understanding the core problem. If you have further questions, feel free to ask. I will make sure to fix this soon. – Ufuk |
For a WindowedStream the uid would be set on the result of the apply/reduce/fold call. The WindowedStream itself does not represent an operation. On Fri, 13 May 2016 at 00:20 Ufuk Celebi <[hidden email]> wrote: On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker |
Thanks Ufuk. Thanks for explaining. The reasons behind the savepoint being restored successfully kind of make sense, but it seems like the window type (count vs time) should be taken into account when restoring savepoints. I don't actually see anyone doing this, but I would expect flink to complain about changing windowing semantics between program versions. On Sat, May 14, 2016 at 3:34 AM, Aljoscha Krettek <[hidden email]> wrote:
Andrew Whitaker | [hidden email] -- Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent. |
Hi Andrew, the reason why the program doesn't fail (and cannot fail, with the current architecture) is that the partitioned state is dynamic/lazy. For example, the count trigger might have a partitioned state called "count" that it uses to keep track of the count. The time trigger requires no state but simply reacts to watermark/processing time progress. The system doesn't know the names and types of state that user functions or internal operators will access. And the types/names might even change over the runtime of a program. If you restore with one window type the state for the other type will simply sit around, not being queried. Cheers, Aljoscha On Mon, 16 May 2016 at 17:42 Andrew Whitaker <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |