Hi,
What's the expected behaviour of: * changing an operator's parallelism * deploying this change from an incremental (RocksDB) checkpoint instead of a savepoint The flink docs[0][1] are a little unclear on what the expected behaviour is here. I understand that the key-space is being changed because parallelism is changed. I've seen instances where this happens and a job does not fail. But how does it treat potentially missing state for a given key? I know I can test this, but I'm curious what the _expected_ behaviour is? I.e. what behaviour can I rely on, which won't change between versions or releases? Do we expect the job to fail? Do we expect missing keys to just be considered empty? Thanks! Aaron Levin |
Hi,
Generally speaking changes of parallelism is supported between checkpoints and savepoints. Other changes to the job’s topology, like adding/changing/removing operators, changing types in the job graph are only officially supported via savepoints. But in reality, as for now, there is no difference between checkpoints and savepoints, but that’s subject to change, so it’s better not to relay this behaviour. For example with unaligned checkpoints [1] (hopefully in 1.11), there will be a difference between those two concepts. Piotrek [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
|
Hi Piotr, Thanks for your response! I understand that checkpoints and savepoints may be diverging (for unaligned checkpoints) but parts also seem to be converging per FLIP-47[0]. Specifically, in FLIP-47 they state that rescaling is "Supported but not in all cases" for checkpoints. What I'm hoping to find is guidance or documentation on when rescaling is supported for checkpoints, and, more importantly, if the cases where it's not supported will result in hard or silent failures. The context here is that we rely on the exactly-once semantics for our Flink jobs in some important systems. In some cases when a job is in a bad state it may not be able to take a checkpoint, but changing the job's parallelism may resolve the issue. Therefore it's important for us to know if deploying from a checkpoint, on purpose or by operator error, will break the semantic guarantees of our job. Hard failure in the cases where you cannot change parallelism would be the desired outcome imo. Thank you! On Fri, Mar 13, 2020 at 9:08 AM Piotr Nowojski <[hidden email]> wrote:
|
Hi Seth,
> Currently, all rescaling operations technically work with checkpoints. That is purely by chance that the implementation supports that, and the line is because the community is not committed to maintaining that functionality
Are you sure that’s the case? Support for rescaling from checkpoint is as far as I know, something that we want/need to have: - if your cluster has just lost a node due to some hardware failure, without downscaling support your job will not be able to recover - future planned life rescaling efforts Also this [1] seems to contradict your statement? Lack of support for rescaling for unaligned checkpoints will be hopefully a temporarily limitation of the first version and it’s on our roadmap to solve this in the future. Piotrek [1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html#rescaling-stateful-stream-processing-jobs
|
I agree with Piotr that we need some type of checkpoint which supports rescaling. Otherwise, the reactive mode and auto-scaling will only work if the system has taken a savepoint which by definition should only be done by the user. Cheers, Till On Mon, Mar 16, 2020 at 8:39 AM Piotr Nowojski <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |