Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

Aaron Levin
Hi,

What's the expected behaviour of:

* changing an operator's parallelism
* deploying this change from an incremental (RocksDB) checkpoint instead of a savepoint

The flink docs[0][1] are a little unclear on what the expected behaviour is here. I understand that the key-space is being changed because parallelism is changed. I've seen instances where this happens and a job does not fail. But how does it treat potentially missing state for a given key? 

I know I can test this, but I'm curious what the _expected_ behaviour is? I.e. what behaviour can I rely on, which won't change between versions or releases? Do we expect the job to fail? Do we expect missing keys to just be considered empty? 

Thanks!


Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

Piotr Nowojski-3
Hi,

Generally speaking changes of parallelism is supported between checkpoints and savepoints. Other changes to the job’s topology, like adding/changing/removing operators, changing types in the job graph are only officially supported via savepoints.

But in reality, as for now, there is no difference between checkpoints and savepoints, but that’s subject to change, so it’s better not to relay this behaviour. For example with unaligned checkpoints [1] (hopefully in 1.11), there will be a difference between those two concepts.

Piotrek

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints

On 12 Mar 2020, at 12:16, Aaron Levin <[hidden email]> wrote:

Hi,

What's the expected behaviour of:

* changing an operator's parallelism
* deploying this change from an incremental (RocksDB) checkpoint instead of a savepoint

The flink docs[0][1] are a little unclear on what the expected behaviour is here. I understand that the key-space is being changed because parallelism is changed. I've seen instances where this happens and a job does not fail. But how does it treat potentially missing state for a given key? 

I know I can test this, but I'm curious what the _expected_ behaviour is? I.e. what behaviour can I rely on, which won't change between versions or releases? Do we expect the job to fail? Do we expect missing keys to just be considered empty? 

Thanks!


Aaron Levin

Reply | Threaded
Open this post in threaded view
|

Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

Aaron Levin
Hi Piotr,

Thanks for your response! I understand that checkpoints and savepoints may be diverging (for unaligned checkpoints) but parts also seem to be converging per FLIP-47[0]. Specifically, in FLIP-47 they state that rescaling is "Supported but not in all cases" for checkpoints. What I'm hoping to find is guidance or documentation on when rescaling is supported for checkpoints, and, more importantly, if the cases where it's not supported will result in hard or silent failures. 

The context here is that we rely on the exactly-once semantics for our Flink jobs in some important systems. In some cases when a job is in a bad state it may not be able to take a checkpoint, but changing the job's parallelism may resolve the issue. Therefore it's important for us to know if deploying from a checkpoint, on purpose or by operator error, will break the semantic guarantees of our job.

Hard failure in the cases where you cannot change parallelism would be the desired outcome imo.

Thank you!


On Fri, Mar 13, 2020 at 9:08 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

Generally speaking changes of parallelism is supported between checkpoints and savepoints. Other changes to the job’s topology, like adding/changing/removing operators, changing types in the job graph are only officially supported via savepoints.

But in reality, as for now, there is no difference between checkpoints and savepoints, but that’s subject to change, so it’s better not to relay this behaviour. For example with unaligned checkpoints [1] (hopefully in 1.11), there will be a difference between those two concepts.

Piotrek

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints

On 12 Mar 2020, at 12:16, Aaron Levin <[hidden email]> wrote:

Hi,

What's the expected behaviour of:

* changing an operator's parallelism
* deploying this change from an incremental (RocksDB) checkpoint instead of a savepoint

The flink docs[0][1] are a little unclear on what the expected behaviour is here. I understand that the key-space is being changed because parallelism is changed. I've seen instances where this happens and a job does not fail. But how does it treat potentially missing state for a given key? 

I know I can test this, but I'm curious what the _expected_ behaviour is? I.e. what behaviour can I rely on, which won't change between versions or releases? Do we expect the job to fail? Do we expect missing keys to just be considered empty? 

Thanks!


Aaron Levin

Reply | Threaded
Open this post in threaded view
|

Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

Piotr Nowojski-3
Hi Seth,

> Currently, all rescaling operations technically work with checkpoints. That is purely by chance that the implementation supports that, and the line is because the community is not committed to maintaining that functionality

Are you sure that’s the case? Support for rescaling from checkpoint is as far as I know, something that we want/need to have:
- if your cluster has just lost a node due to some hardware failure, without downscaling support your job will not be able to recover
- future planned life rescaling efforts

Also this [1] seems to contradict your statement?

Lack of support for rescaling for unaligned checkpoints will be hopefully a temporarily limitation of the first version and it’s on our roadmap to solve this in the future.

Piotrek

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html#rescaling-stateful-stream-processing-jobs

On 13 Mar 2020, at 17:44, Seth Wiesman <[hidden email]> wrote:

Hi Aaron,

Currently, all rescaling operations technically work with checkpoints. That is purely by chance that the implementation supports that, and the line is because the community is not committed to maintaining that functionality. As we add cases, such as unaligned checkpoints, which actually prevent rescaling the documentation will be updated accordingly. FLIP-47 has more to do with consolidating terminology and how actions are triggered and are not particularly relevant to the discussion of rescaling jobs.

On Fri, Mar 13, 2020 at 11:39 AM Aaron Levin <[hidden email]> wrote:
Hi Piotr,

Thanks for your response! I understand that checkpoints and savepoints may be diverging (for unaligned checkpoints) but parts also seem to be converging per FLIP-47[0]. Specifically, in FLIP-47 they state that rescaling is "Supported but not in all cases" for checkpoints. What I'm hoping to find is guidance or documentation on when rescaling is supported for checkpoints, and, more importantly, if the cases where it's not supported will result in hard or silent failures. 

The context here is that we rely on the exactly-once semantics for our Flink jobs in some important systems. In some cases when a job is in a bad state it may not be able to take a checkpoint, but changing the job's parallelism may resolve the issue. Therefore it's important for us to know if deploying from a checkpoint, on purpose or by operator error, will break the semantic guarantees of our job.

Hard failure in the cases where you cannot change parallelism would be the desired outcome imo.

Thank you!


On Fri, Mar 13, 2020 at 9:08 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

Generally speaking changes of parallelism is supported between checkpoints and savepoints. Other changes to the job’s topology, like adding/changing/removing operators, changing types in the job graph are only officially supported via savepoints.

But in reality, as for now, there is no difference between checkpoints and savepoints, but that’s subject to change, so it’s better not to relay this behaviour. For example with unaligned checkpoints [1] (hopefully in 1.11), there will be a difference between those two concepts.

Piotrek

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints

On 12 Mar 2020, at 12:16, Aaron Levin <[hidden email]> wrote:

Hi,

What's the expected behaviour of:

* changing an operator's parallelism
* deploying this change from an incremental (RocksDB) checkpoint instead of a savepoint

The flink docs[0][1] are a little unclear on what the expected behaviour is here. I understand that the key-space is being changed because parallelism is changed. I've seen instances where this happens and a job does not fail. But how does it treat potentially missing state for a given key? 

I know I can test this, but I'm curious what the _expected_ behaviour is? I.e. what behaviour can I rely on, which won't change between versions or releases? Do we expect the job to fail? Do we expect missing keys to just be considered empty? 

Thanks!


Aaron Levin



--
Seth Wiesman | Solutions Architect
+1 314 387 1463


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

Till Rohrmann
I agree with Piotr that we need some type of checkpoint which supports rescaling. Otherwise, the reactive mode and auto-scaling will only work if the system has taken a savepoint which by definition should only be done by the user.

Cheers,
Till

On Mon, Mar 16, 2020 at 8:39 AM Piotr Nowojski <[hidden email]> wrote:
Hi Seth,

> Currently, all rescaling operations technically work with checkpoints. That is purely by chance that the implementation supports that, and the line is because the community is not committed to maintaining that functionality

Are you sure that’s the case? Support for rescaling from checkpoint is as far as I know, something that we want/need to have:
- if your cluster has just lost a node due to some hardware failure, without downscaling support your job will not be able to recover
- future planned life rescaling efforts

Also this [1] seems to contradict your statement?

Lack of support for rescaling for unaligned checkpoints will be hopefully a temporarily limitation of the first version and it’s on our roadmap to solve this in the future.

Piotrek

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html#rescaling-stateful-stream-processing-jobs

On 13 Mar 2020, at 17:44, Seth Wiesman <[hidden email]> wrote:

Hi Aaron,

Currently, all rescaling operations technically work with checkpoints. That is purely by chance that the implementation supports that, and the line is because the community is not committed to maintaining that functionality. As we add cases, such as unaligned checkpoints, which actually prevent rescaling the documentation will be updated accordingly. FLIP-47 has more to do with consolidating terminology and how actions are triggered and are not particularly relevant to the discussion of rescaling jobs.

On Fri, Mar 13, 2020 at 11:39 AM Aaron Levin <[hidden email]> wrote:
Hi Piotr,

Thanks for your response! I understand that checkpoints and savepoints may be diverging (for unaligned checkpoints) but parts also seem to be converging per FLIP-47[0]. Specifically, in FLIP-47 they state that rescaling is "Supported but not in all cases" for checkpoints. What I'm hoping to find is guidance or documentation on when rescaling is supported for checkpoints, and, more importantly, if the cases where it's not supported will result in hard or silent failures. 

The context here is that we rely on the exactly-once semantics for our Flink jobs in some important systems. In some cases when a job is in a bad state it may not be able to take a checkpoint, but changing the job's parallelism may resolve the issue. Therefore it's important for us to know if deploying from a checkpoint, on purpose or by operator error, will break the semantic guarantees of our job.

Hard failure in the cases where you cannot change parallelism would be the desired outcome imo.

Thank you!


On Fri, Mar 13, 2020 at 9:08 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

Generally speaking changes of parallelism is supported between checkpoints and savepoints. Other changes to the job’s topology, like adding/changing/removing operators, changing types in the job graph are only officially supported via savepoints.

But in reality, as for now, there is no difference between checkpoints and savepoints, but that’s subject to change, so it’s better not to relay this behaviour. For example with unaligned checkpoints [1] (hopefully in 1.11), there will be a difference between those two concepts.

Piotrek

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints

On 12 Mar 2020, at 12:16, Aaron Levin <[hidden email]> wrote:

Hi,

What's the expected behaviour of:

* changing an operator's parallelism
* deploying this change from an incremental (RocksDB) checkpoint instead of a savepoint

The flink docs[0][1] are a little unclear on what the expected behaviour is here. I understand that the key-space is being changed because parallelism is changed. I've seen instances where this happens and a job does not fail. But how does it treat potentially missing state for a given key? 

I know I can test this, but I'm curious what the _expected_ behaviour is? I.e. what behaviour can I rely on, which won't change between versions or releases? Do we expect the job to fail? Do we expect missing keys to just be considered empty? 

Thanks!


Aaron Levin



--
Seth Wiesman | Solutions Architect
+1 314 387 1463


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen