What happens when a job is rescaled

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

What happens when a job is rescaled

Richard Deurwaarder
Hello,

I have a question about what actually happens when a job is started from an existing checkpoint, in particular when the parallelism has changed.

Context:
We have a flink 1.11.2 (DataStream API) job running on Kubernetes (GCP) writing its state to GCS. 
Normally we run with 12 TMs each 3 CPU cores and about 12gb RAM. We have quite a bit of state (stored with rocksdb), about 20-25 operators which have state ranging from 20gb to 180gb per operator. In total we have about 600gb of state.

During normal operations, this works fine the only 'problem' we have is that savepoints (creation and starting from) are very slow. Therefore we use external checkpoints to deploy new versions of our job.

What is our problem?
One of the things I am currently trying to investigate is why rescaling our job is so slow. The way we rescale is by canceling the job and then starting the job with a higher parallelism, whilst pointing to the previous (external) checkpoint.

Without rescaling, for instance when deploying new code, starting a job from a checkpoint would cause the first new checkpoint to complete in maybe 5 minutes. 
However, if I double the parallelism the first checkpoint takes over an hour or more to complete. This is troublesome because kubernetes might sometime decide to restart a TM causing a job restart and thus having to redo all the checkpoint work...( very annoying if this happens when the checkpoint is about to finish.. :) )

What happens during a checkpoint:
Looking at metrics we can see:
 * CPU being at 100%
 * RAM swinging up and down depending on what operator is currently checkpointing.
 * Network traffic to GCS peaks at 100mb/s per TM (tests indicate network should not be a cause a bottle neck).
 * Disk (SSD) iops are in the order of 2-3000 upwards to spikes of 10k iops, not even close to capacity

Now the obvious answer would be to increase the CPU. This does not really seem to help though, plus we'd really like to prevent having to vertically scale our job just to do parallelism changes, as during normal operations our CPU usage is around 50-60%.

Question:
My question is: 
What actually happens when flink starts a new job from an existing checkpoint. What extra work needs to be done because of a change in parallelism? Is it 'normal' that we would incur this penalty for scaling up or down?
Do you have any pointers where we should look to get better performance?

Thank you in advance :)

Richard
Reply | Threaded
Open this post in threaded view
|

Re: What happens when a job is rescaled

Yun Tang
Hi Richard,

Since you did not provide the information of which state backend you use, I would give the phase of rescaling from externalized checkpoint for two different state backends:

For RocksDB:
1) If parallelism not changed, downloading all sst files and then just open the files as one rocksDB.
2) If parallelism changed, Flink will choose one of the candidate rocksDB instances as the initial one to open [1], and open other rocksDB instances to insert into the target one which means Flink needs to write data to target rocksDB with possible high CPU usage.

For FsStateBackend:
1) No matter parallelism changed, reading data from remote DFS and writing to memory which also occupy CPU resources.

Since you observed the disk usage during the rescaling phase, I think you should use RocksDB state backend. Unfortunately, we might not have real good solutions to improve this phase currently [2]. And once you complete the rescaling, the next checkpoint could be nearly a complete instead of incremental checkpoint, I think you could check the checkpoint size of next new checkpoint to see whether data size increased. BTW, you could also check whether the job is backpressured during the next checkpoint as backpressure would also increase the duration of checkpoint.


Best
Yun Tang

From: Richard Deurwaarder <[hidden email]>
Sent: Saturday, November 14, 2020 0:14
To: user <[hidden email]>
Subject: What happens when a job is rescaled
 
Hello,

I have a question about what actually happens when a job is started from an existing checkpoint, in particular when the parallelism has changed.

Context:
We have a flink 1.11.2 (DataStream API) job running on Kubernetes (GCP) writing its state to GCS. 
Normally we run with 12 TMs each 3 CPU cores and about 12gb RAM. We have quite a bit of state (stored with rocksdb), about 20-25 operators which have state ranging from 20gb to 180gb per operator. In total we have about 600gb of state.

During normal operations, this works fine the only 'problem' we have is that savepoints (creation and starting from) are very slow. Therefore we use external checkpoints to deploy new versions of our job.

What is our problem?
One of the things I am currently trying to investigate is why rescaling our job is so slow. The way we rescale is by canceling the job and then starting the job with a higher parallelism, whilst pointing to the previous (external) checkpoint.

Without rescaling, for instance when deploying new code, starting a job from a checkpoint would cause the first new checkpoint to complete in maybe 5 minutes. 
However, if I double the parallelism the first checkpoint takes over an hour or more to complete. This is troublesome because kubernetes might sometime decide to restart a TM causing a job restart and thus having to redo all the checkpoint work...( very annoying if this happens when the checkpoint is about to finish.. :) )

What happens during a checkpoint:
Looking at metrics we can see:
 * CPU being at 100%
 * RAM swinging up and down depending on what operator is currently checkpointing.
 * Network traffic to GCS peaks at 100mb/s per TM (tests indicate network should not be a cause a bottle neck).
 * Disk (SSD) iops are in the order of 2-3000 upwards to spikes of 10k iops, not even close to capacity

Now the obvious answer would be to increase the CPU. This does not really seem to help though, plus we'd really like to prevent having to vertically scale our job just to do parallelism changes, as during normal operations our CPU usage is around 50-60%.

Question:
My question is: 
What actually happens when flink starts a new job from an existing checkpoint. What extra work needs to be done because of a change in parallelism? Is it 'normal' that we would incur this penalty for scaling up or down?
Do you have any pointers where we should look to get better performance?

Thank you in advance :)

Richard