After configuration checkpoint strategy, Flink Job cannot restart when job failed

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

After configuration checkpoint strategy, Flink Job cannot restart when job failed

1095193290@qq.com
Hi community,
I have  a job which read data from Datahub and sink data to Elasticsearch. The Elasticsearch frequently timeout which lead to Flink job failed and stopped, then a manually restart is needed.  After investigate checkpoint strategy, I believe checkpoint can restart job automaically and avoid a manually restart when job failed. However,  the job still failed and stopped when Elasticsearch timeout although I have configure checkpoint in flink-conf.yaml
flink-conf.yaml
state.checkpoints.dir: hdfs://172.16.1.192:9000/flink-checkpoints
execution.checkpointing.interval: 10 s
state.savepoints.dir: hdfs://172.16.1.192:9000/flink-savepoints
restart-strategy: fixed-delay
restart-strategy.fixed-delay.delay: 1 min
flink log  
see attachment for full log.
[INFO ] 2021-06-05 10:35:59.020 [flink-akka.actor.default-dispatcher-19] o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy-[getTasksNeedingRestart] - 115 - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
[INFO ] 2021-06-05 10:35:59.020 [flink-akka.actor.default-dispatcher-19] o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy-[getTasksNeedingRestart] - 152 - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 
the above log  shows restarted strategy works after Exception occurs. 
[INFO ] 2021-06-05 10:38:09.428 [flink-akka.actor.default-dispatcher-4] o.a.f.r.c.CheckpointCoordinator-[shutdown] - 405 - Stopping checkpoint coordinator for job 63c270e00b69eb967f59479bb1c84113.
[INFO ] 2021-06-05 10:38:09.428 [flink-akka.actor.default-dispatcher-4] o.a.f.r.c.StandaloneCompletedCheckpointStore-[shutdown] - 96 - Shutting down
[INFO ] 2021-06-05 10:38:09.451 [flink-akka.actor.default-dispatcher-2] o.a.f.r.dispatcher.MiniDispatcher-[jobReachedGloballyTerminalState] - 827 - Job 63c270e00b69eb967f59479bb1c84113 reached globally terminal state FAILED.
[INFO ] 2021-06-05 10:38:09.452 [flink-akka.actor.default-dispatcher-2] o.a.f.r.dispatcher.MiniDispatcher-[jobReachedGloballyTerminalState] - 132 - Shutting down cluster with state FAILED, jobCancelled: false, executionMode: DETACHED
[INFO ] 2021-06-05 10:38:09.453 [flink-akka.actor.default-dispatcher-2] o.a.f.r.entrypoint.ClusterEntrypoint-[shutDownAsync] - 481 - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics null.
[INFO ] 2021-06-05 10:38:09.453 [flink-akka.actor.default-dispatcher-2] o.a.f.r.j.MiniDispatcherRestEndpoint-[closeAsync] - 309 - Shutting down rest endpoint.
[INFO ] 2021-06-05 10:38:09.463 [flink-akka.actor.default-dispatcher-4] o.a.f.runtime.jobmaster.JobMaster-[onStop] - 395 - Stopping the JobMaster for job insert-into_default_catalog.default_database.table3(63c270e00b69eb967f59479bb1c84113).
However, flink cluster eventually shutdown after serveral restarts failed. Why my flink job eventually failed even though checkpoint is enabled and restart-strategy is set to fixed-delay?




log.txt (145K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: After configuration checkpoint strategy, Flink Job cannot restart when job failed

Chesnay Schepler
The default number of restart attempts is 1. You need to explicitly configure it to allow more failures.

On 6/7/2021 11:53 AM, [hidden email] wrote:
Hi community,
I have  a job which read data from Datahub and sink data to Elasticsearch. The Elasticsearch frequently timeout which lead to Flink job failed and stopped, then a manually restart is needed.  After investigate checkpoint strategy, I believe checkpoint can restart job automaically and avoid a manually restart when job failed. However,  the job still failed and stopped when Elasticsearch timeout although I have configure checkpoint in flink-conf.yaml
flink-conf.yaml
state.checkpoints.dir: hdfs://172.16.1.192:9000/flink-checkpoints
execution.checkpointing.interval: 10 s
state.savepoints.dir: hdfs://172.16.1.192:9000/flink-savepoints
restart-strategy: fixed-delay
restart-strategy.fixed-delay.delay: 1 min
flink log  
see attachment for full log.
[INFO ] 2021-06-05 10:35:59.020 [flink-akka.actor.default-dispatcher-19] o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy-[getTasksNeedingRestart] - 115 - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
[INFO ] 2021-06-05 10:35:59.020 [flink-akka.actor.default-dispatcher-19] o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy-[getTasksNeedingRestart] - 152 - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 
the above log  shows restarted strategy works after Exception occurs. 
[INFO ] 2021-06-05 10:38:09.428 [flink-akka.actor.default-dispatcher-4] o.a.f.r.c.CheckpointCoordinator-[shutdown] - 405 - Stopping checkpoint coordinator for job 63c270e00b69eb967f59479bb1c84113.
[INFO ] 2021-06-05 10:38:09.428 [flink-akka.actor.default-dispatcher-4] o.a.f.r.c.StandaloneCompletedCheckpointStore-[shutdown] - 96 - Shutting down
[INFO ] 2021-06-05 10:38:09.451 [flink-akka.actor.default-dispatcher-2] o.a.f.r.dispatcher.MiniDispatcher-[jobReachedGloballyTerminalState] - 827 - Job 63c270e00b69eb967f59479bb1c84113 reached globally terminal state FAILED.
[INFO ] 2021-06-05 10:38:09.452 [flink-akka.actor.default-dispatcher-2] o.a.f.r.dispatcher.MiniDispatcher-[jobReachedGloballyTerminalState] - 132 - Shutting down cluster with state FAILED, jobCancelled: false, executionMode: DETACHED
[INFO ] 2021-06-05 10:38:09.453 [flink-akka.actor.default-dispatcher-2] o.a.f.r.entrypoint.ClusterEntrypoint-[shutDownAsync] - 481 - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics null.
[INFO ] 2021-06-05 10:38:09.453 [flink-akka.actor.default-dispatcher-2] o.a.f.r.j.MiniDispatcherRestEndpoint-[closeAsync] - 309 - Shutting down rest endpoint.
[INFO ] 2021-06-05 10:38:09.463 [flink-akka.actor.default-dispatcher-4] o.a.f.runtime.jobmaster.JobMaster-[onStop] - 395 - Stopping the JobMaster for job insert-into_default_catalog.default_database.table3(63c270e00b69eb967f59479bb1c84113).
However, flink cluster eventually shutdown after serveral restarts failed. Why my flink job eventually failed even though checkpoint is enabled and restart-strategy is set to fixed-delay?