Interesting window behavior with savepoints

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Interesting window behavior with savepoints

Andrew Whitaker
Hi,

I was recently experimenting with savepoints and various situations in which they succeed or fail. I expected this example to fail:


Basically, the first program runs with a count window. The second program is identical except that it uses a time window instead of a count window.

From what I've observed, most of the time when Flink can't successfully restore a checkpoint it throws an exception saying as much. I was expecting to see that behavior here. Could someone explain why this "works" (as in, flink accepts the program with the savepoint from the first version of the program), and if this is a bug?

Thanks,

--
Andrew Whitaker | [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Interesting window behavior with savepoints

Andrew Whitaker
"Flink can't successfully restore a checkpoint" should be "Flink can't successfully restore a savepoint".

On Thu, May 12, 2016 at 3:44 PM, Andrew Whitaker <[hidden email]> wrote:
Hi,

I was recently experimenting with savepoints and various situations in which they succeed or fail. I expected this example to fail:


Basically, the first program runs with a count window. The second program is identical except that it uses a time window instead of a count window.

From what I've observed, most of the time when Flink can't successfully restore a checkpoint it throws an exception saying as much. I was expecting to see that behavior here. Could someone explain why this "works" (as in, flink accepts the program with the savepoint from the first version of the program), and if this is a bug?

Thanks,

--
Andrew Whitaker | [hidden email]



--
Andrew Whitaker | [hidden email]
--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.
Reply | Threaded
Open this post in threaded view
|

Re: Interesting window behavior with savepoints

Ufuk Celebi
In reply to this post by Andrew Whitaker
On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker
<[hidden email]> wrote:
> From what I've observed, most of the time when Flink can't successfully
> restore a checkpoint it throws an exception saying as much. I was expecting
> to see that behavior here. Could someone explain why this "works" (as in,
> flink accepts the program with the savepoint from the first version of the
> program), and if this is a bug?

Hey Andrew! Thanks for reporting this.

Flink generates operator IDs and uses these to map the state back to
the same operator when restoring from a savepoint. We want these IDs
to stay the same as long as the program does not change.

The ID can either be generated automatically by Flink or manually by the user.

The automatically generated ID is based on certain topology attributes
like parallelism, operator placement, etc. If the attribute changes,
the operator ID changes and you can't map the savepoint state back. If
it stays the same, we assume that the program has not changed.

The problem in your example is that to Flink both programs look the
same with respect to how the IDs are generated: the topology didn't
change and both the time and count window are executed by the
WindowOperator with an InternalWindowFunction.

The recommended way to work with savepoints is to skip the automatic
IDs altogether and assign the IDs manually instead. You can do this
via the "uid(String)" method of each operator, which gives you
fine-grained control over the "versioning" of state:

env.addSource(..).uid("my-source")

vs.

env.addSource(..).uid("my-source-2")

The problem I've just noticed is that you can't specify this on
WindowedStreams, but only on DataStreams, which is clearly a bug.
Furthermore, it might be a good idea to special case windows when
automatically generating the IDs.

I hope this helps a little with understanding the core problem. If you
have further questions, feel free to ask. I will make sure to fix this
soon.

– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Interesting window behavior with savepoints

Aljoscha Krettek
For a WindowedStream the uid would be set on the result of the apply/reduce/fold call. The WindowedStream itself does not represent an operation.

On Fri, 13 May 2016 at 00:20 Ufuk Celebi <[hidden email]> wrote:
On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker
<[hidden email]> wrote:
> From what I've observed, most of the time when Flink can't successfully
> restore a checkpoint it throws an exception saying as much. I was expecting
> to see that behavior here. Could someone explain why this "works" (as in,
> flink accepts the program with the savepoint from the first version of the
> program), and if this is a bug?

Hey Andrew! Thanks for reporting this.

Flink generates operator IDs and uses these to map the state back to
the same operator when restoring from a savepoint. We want these IDs
to stay the same as long as the program does not change.

The ID can either be generated automatically by Flink or manually by the user.

The automatically generated ID is based on certain topology attributes
like parallelism, operator placement, etc. If the attribute changes,
the operator ID changes and you can't map the savepoint state back. If
it stays the same, we assume that the program has not changed.

The problem in your example is that to Flink both programs look the
same with respect to how the IDs are generated: the topology didn't
change and both the time and count window are executed by the
WindowOperator with an InternalWindowFunction.

The recommended way to work with savepoints is to skip the automatic
IDs altogether and assign the IDs manually instead. You can do this
via the "uid(String)" method of each operator, which gives you
fine-grained control over the "versioning" of state:

env.addSource(..).uid("my-source")

vs.

env.addSource(..).uid("my-source-2")

The problem I've just noticed is that you can't specify this on
WindowedStreams, but only on DataStreams, which is clearly a bug.
Furthermore, it might be a good idea to special case windows when
automatically generating the IDs.

I hope this helps a little with understanding the core problem. If you
have further questions, feel free to ask. I will make sure to fix this
soon.

– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Interesting window behavior with savepoints

Andrew Whitaker
Thanks Ufuk.

Thanks for explaining. The reasons behind the savepoint being restored successfully kind of make sense, but it seems like the window type (count vs time) should be taken into account when restoring savepoints. I don't actually see anyone doing this, but I would expect flink to complain about changing windowing semantics between program versions.

On Sat, May 14, 2016 at 3:34 AM, Aljoscha Krettek <[hidden email]> wrote:
For a WindowedStream the uid would be set on the result of the apply/reduce/fold call. The WindowedStream itself does not represent an operation.

On Fri, 13 May 2016 at 00:20 Ufuk Celebi <[hidden email]> wrote:
On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker
<[hidden email]> wrote:
> From what I've observed, most of the time when Flink can't successfully
> restore a checkpoint it throws an exception saying as much. I was expecting
> to see that behavior here. Could someone explain why this "works" (as in,
> flink accepts the program with the savepoint from the first version of the
> program), and if this is a bug?

Hey Andrew! Thanks for reporting this.

Flink generates operator IDs and uses these to map the state back to
the same operator when restoring from a savepoint. We want these IDs
to stay the same as long as the program does not change.

The ID can either be generated automatically by Flink or manually by the user.

The automatically generated ID is based on certain topology attributes
like parallelism, operator placement, etc. If the attribute changes,
the operator ID changes and you can't map the savepoint state back. If
it stays the same, we assume that the program has not changed.

The problem in your example is that to Flink both programs look the
same with respect to how the IDs are generated: the topology didn't
change and both the time and count window are executed by the
WindowOperator with an InternalWindowFunction.

The recommended way to work with savepoints is to skip the automatic
IDs altogether and assign the IDs manually instead. You can do this
via the "uid(String)" method of each operator, which gives you
fine-grained control over the "versioning" of state:

env.addSource(..).uid("my-source")

vs.

env.addSource(..).uid("my-source-2")

The problem I've just noticed is that you can't specify this on
WindowedStreams, but only on DataStreams, which is clearly a bug.
Furthermore, it might be a good idea to special case windows when
automatically generating the IDs.

I hope this helps a little with understanding the core problem. If you
have further questions, feel free to ask. I will make sure to fix this
soon.

– Ufuk



--
Andrew Whitaker | [hidden email]
--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.
Reply | Threaded
Open this post in threaded view
|

Re: Interesting window behavior with savepoints

Aljoscha Krettek
Hi Andrew,
the reason why the program doesn't fail (and cannot fail, with the current architecture) is that the partitioned state is dynamic/lazy. For example, the count trigger might have a partitioned state called "count" that it uses to keep track of the count. The time trigger requires no state but simply reacts to watermark/processing time progress. The system doesn't know the names and types of state that user functions or internal operators will access. And the types/names might even change over the runtime of a program.

If you restore with one window type the state for the other type will simply sit around, not being queried.

Cheers,
Aljoscha

On Mon, 16 May 2016 at 17:42 Andrew Whitaker <[hidden email]> wrote:
Thanks Ufuk.

Thanks for explaining. The reasons behind the savepoint being restored successfully kind of make sense, but it seems like the window type (count vs time) should be taken into account when restoring savepoints. I don't actually see anyone doing this, but I would expect flink to complain about changing windowing semantics between program versions.

On Sat, May 14, 2016 at 3:34 AM, Aljoscha Krettek <[hidden email]> wrote:
For a WindowedStream the uid would be set on the result of the apply/reduce/fold call. The WindowedStream itself does not represent an operation.

On Fri, 13 May 2016 at 00:20 Ufuk Celebi <[hidden email]> wrote:
On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker
<[hidden email]> wrote:
> From what I've observed, most of the time when Flink can't successfully
> restore a checkpoint it throws an exception saying as much. I was expecting
> to see that behavior here. Could someone explain why this "works" (as in,
> flink accepts the program with the savepoint from the first version of the
> program), and if this is a bug?

Hey Andrew! Thanks for reporting this.

Flink generates operator IDs and uses these to map the state back to
the same operator when restoring from a savepoint. We want these IDs
to stay the same as long as the program does not change.

The ID can either be generated automatically by Flink or manually by the user.

The automatically generated ID is based on certain topology attributes
like parallelism, operator placement, etc. If the attribute changes,
the operator ID changes and you can't map the savepoint state back. If
it stays the same, we assume that the program has not changed.

The problem in your example is that to Flink both programs look the
same with respect to how the IDs are generated: the topology didn't
change and both the time and count window are executed by the
WindowOperator with an InternalWindowFunction.

The recommended way to work with savepoints is to skip the automatic
IDs altogether and assign the IDs manually instead. You can do this
via the "uid(String)" method of each operator, which gives you
fine-grained control over the "versioning" of state:

env.addSource(..).uid("my-source")

vs.

env.addSource(..).uid("my-source-2")

The problem I've just noticed is that you can't specify this on
WindowedStreams, but only on DataStreams, which is clearly a bug.
Furthermore, it might be a good idea to special case windows when
automatically generating the IDs.

I hope this helps a little with understanding the core problem. If you
have further questions, feel free to ask. I will make sure to fix this
soon.

– Ufuk



--
Andrew Whitaker | [hidden email]
--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.