Question about start with checkpoint.

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about start with checkpoint.

yunfan123
How this exactly works?
For example, I  save my state using rocksdb + hdfs.
When I change the parallelism of my job,  can start from checkpoint work?
Reply | Threaded
Open this post in threaded view
|

Re: Question about start with checkpoint.

Kostas Kloudas
Hi,

In order to change parallelism, you should take a savepoint, as described here:


Kostas

On May 21, 2017, at 5:43 AM, yunfan123 <[hidden email]> wrote:

How this exactly works?
For example, I  save my state using rocksdb + hdfs.
When I change the parallelism of my job,  can start from checkpoint work?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Question-about-start-with-checkpoint-tp13234.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Question about start with checkpoint.

yunfan123
I using 1.2.0 release, so I read the document in https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/savepoints.html.
It means it can recover from the savepoint even if I change the parallelism.
How does flink implement it?
For example using rocksdb+hdfs as state backend,  flink just download all rocksdb to local?
Reply | Threaded
Open this post in threaded view
|

Re: Question about start with checkpoint.

SHI Xiaogang
In reply to this post by yunfan123
Hi Yunfan,

Jobs are supposed to correctly restart from both savepoints and checkpoints with different parallelisms if only operator states and keyed states are used. In the cases where there exist unpartitionable states (e.g., those are produced by the Checkpointed interface), the job will fail to restart if the parallelism is changed.

In Flink, both operator states and keyed states are described as collections of objects, hence are partitionable. To be specific, operator states are composed of a list of objects. When the parallelism changes, these objects will be redistributed to the tasks evenly. 

The assignment of keyed states shares a similar idea. The keyed states are composed of a set of key groups. When the parallelism changes, these key groups will also be redistributed to the tasks.  The restoring of keyed states varies in different state backend settings.  In Flink-1.2, the rocksdb state backend will download all the key-value pairs in its key group range and insert them into a new rocksdb instance to recover the states.

You can find more details about the scaling of keyed states and operator states in the following links.

May the information helps you.

Regards
Xiaogang


2017-05-21 11:43 GMT+08:00 yunfan123 <[hidden email]>:
How this exactly works?
For example, I  save my state using rocksdb + hdfs.
When I change the parallelism of my job,  can start from checkpoint work?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Question-about-start-with-checkpoint-tp13234.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.