|
Just to add. You can also change parallelism from checkpoints (it's usually much faster than using savepoints). For that, you want to use external checkpoints that are retained after job completion.
But savepoints are the way to go for any topology changes, version updates, etc. Hi Arvid,
Thanks for taking time to answer this. Yeah, we are also using save points as only restore mechanism If job parallelism needs to be changed or some job graph properties need to be updated. Otherwise during other rolling deployments of task manager pods or job manager pods we solely rely on previously completed checkpoints.
Hi Dhanesh,
We recommend to use savepoints only for migrations, investigations, A/B testing, and time travel and rely completely on checkpoints for fault tolerance. Are you using it differently?
Currently, we are triggering savepoints using REST apis. And query the
status of savepoint by the returned handle. In case there is a network
issue because of which we couldn't receive response then in that case
how to find out if the savepoint in the previous request was triggered
or not? Is there a way to add "idempotency-key" to each API request so
that we can safely retry triggering savepoint? By doing this, we want to
avoid multiple triggers of consecutive savepoints during job upgrades.
I think you'd have to use your logging system and have a metric/trigger on the respective line. I don't think there is any REST API for that.
Our workflow for capturing savepoint looks like this - call POST
/savepoint endpoint. Use the returned trigger handle to periodically
poll the status of savepoint. Once the savepoint is completed then
restore the job from that savepoint. We are running our flink clusters
in k8s. Since pod IPs can get restarted / migrated quite often in k8s,
it's possible that the JM pod that was used to capture the savepoint
happens to be recycled before completion of savepoint. In that case, we
can't query the status of triggered savepoint from the previously
returned handle. As neither the newly created JM pod or any other
standby JMs have information about this savepoint. I couldn't find any
config that makes Flink persist state of ongoing savepoints to an
external store which will allow users to query the status of savepoint
via any available JM instance in HA setup.
Not an expert on K8s but couldn't you expose the JM as a K8s service. That should follow the migration automatically.
If one of the TMs crashes during ongoing checkpoint then I believe that
checkpoint is marked as failed and on the next checkpoint interval Flink
triggers a new checkpoint by looking at the previously completed
checkpoint counter. The next checkpoint attempt might get acknowledged
by all operators and marked as completed. Is that correct? In case of
savepoints this is not possible. So how does flink resume the savepoint
capturing process in case of job restarts or TM failures?
Savepoints have to be triggered anew. Savepoints are meant as a purely manual feature. Again, you could automate it, if you look at the logs.
Best,
Arvid
On Fri, Apr 16, 2021 at 12:33 PM dhanesh arole < [hidden email]> wrote: Hello all,
I had 2 questions regarding savepoint fault tolerance. Job manager restart: - Currently, we are triggering savepoints using REST apis. And query the status of savepoint by the returned handle. In case there is a network issue because of which we couldn't receive response then in that case how to find out if the savepoint in the previous request was triggered or not? Is there a way to add "idempotency-key" to each API request so that we can safely retry triggering savepoint? By doing this, we want to avoid multiple triggers of consecutive savepoints during job upgrades.
- Our workflow for capturing savepoint looks like this - call POST /savepoint endpoint. Use the returned trigger handle to periodically poll the status of savepoint. Once the savepoint is completed then restore the job from that savepoint. We are running our flink clusters in k8s. Since pod IPs can get restarted / migrated quite often in k8s, it's possible that the JM pod that was used to capture the savepoint happens to be recycled before completion of savepoint. In that case, we can't query the status of triggered savepoint from the previously returned handle. As neither the newly created JM pod or any other standby JMs have information about this savepoint. I couldn't find any config that makes Flink persist state of ongoing savepoints to an external store which will allow users to query the status of savepoint via any available JM instance in HA setup.
Task manager restart: - If one of the TMs crashes during ongoing checkpoint then I believe that checkpoint is marked as failed and on the next checkpoint interval Flink triggers a new checkpoint by looking at the previously completed checkpoint counter. The next checkpoint attempt might get acknowledged by all operators and marked as completed. Is that correct? In case of savepoints this is not possible. So how does flink resume the savepoint capturing process in case of job restarts or TM failures?
- I am sure this must be already handled but just wanted to confirm and get help in finding relevant code references for this so I can dig deeper for understanding it in depth from an educational point of view.
- Dhanesh Arole ( Sent from mobile device. Pardon me for typos )
--
- Dhanesh ( sent from my mobile device. Pardon me for any typos )
|