Hi Yunfan,
Jobs are supposed to correctly restart from both savepoints and checkpoints with different parallelisms if only operator states and keyed states are used. In the cases where there exist unpartitionable states (e.g., those are produced by the Checkpointed interface), the job will fail to restart if the parallelism is changed.
In Flink, both operator states and keyed states are described as collections of objects, hence are partitionable. To be specific, operator states are composed of a list of objects. When the parallelism changes, these objects will be redistributed to the tasks evenly.
The assignment of keyed states shares a similar idea. The keyed states are composed of a set of key groups. When the parallelism changes, these key groups will also be redistributed to the tasks. The restoring of keyed states varies in different state backend settings. In Flink-1.2, the rocksdb state backend will download all the key-value pairs in its key group range and insert them into a new rocksdb instance to recover the states.
You can find more details about the scaling of keyed states and operator states in the following links.
May the information helps you.
Regards
Xiaogang