Posted by
1095193290@qq.com on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/After-configuration-checkpoint-strategy-Flink-Job-cannot-restart-when-job-failed-tp44247.html
Hi community,
I have a job which read data from Datahub and sink data to Elasticsearch. The Elasticsearch frequently timeout which lead to Flink job failed and stopped, then a manually restart is needed. After investigate checkpoint strategy, I believe checkpoint can restart job automaically and avoid a manually restart when job failed. However, the job still failed and stopped when Elasticsearch timeout although I have configure checkpoint in flink-conf.yaml
flink-conf.yaml
state.checkpoints.dir: hdfs://172.16.1.192:9000/flink-checkpoints
execution.checkpointing.interval: 10 s
state.savepoints.dir: hdfs://172.16.1.192:9000/flink-savepoints
restart-strategy: fixed-delay
restart-strategy.fixed-delay.delay: 1 min
flink log
see attachment for full log.
[INFO ] 2021-06-05 10:35:59.020 [flink-akka.actor.default-dispatcher-19] o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy-[getTasksNeedingRestart] - 115 - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
[INFO ] 2021-06-05 10:35:59.020 [flink-akka.actor.default-dispatcher-19] o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy-[getTasksNeedingRestart] - 152 - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
the above log shows restarted strategy works after Exception occurs.
[INFO ] 2021-06-05 10:38:09.428 [flink-akka.actor.default-dispatcher-4] o.a.f.r.c.CheckpointCoordinator-[shutdown] - 405 - Stopping checkpoint coordinator for job 63c270e00b69eb967f59479bb1c84113.
[INFO ] 2021-06-05 10:38:09.428 [flink-akka.actor.default-dispatcher-4] o.a.f.r.c.StandaloneCompletedCheckpointStore-[shutdown] - 96 - Shutting down
[INFO ] 2021-06-05 10:38:09.451 [flink-akka.actor.default-dispatcher-2] o.a.f.r.dispatcher.MiniDispatcher-[jobReachedGloballyTerminalState] - 827 - Job 63c270e00b69eb967f59479bb1c84113 reached globally terminal state FAILED.
[INFO ] 2021-06-05 10:38:09.452 [flink-akka.actor.default-dispatcher-2] o.a.f.r.dispatcher.MiniDispatcher-[jobReachedGloballyTerminalState] - 132 - Shutting down cluster with state FAILED, jobCancelled: false, executionMode: DETACHED
[INFO ] 2021-06-05 10:38:09.453 [flink-akka.actor.default-dispatcher-2] o.a.f.r.entrypoint.ClusterEntrypoint-[shutDownAsync] - 481 - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics null.
[INFO ] 2021-06-05 10:38:09.453 [flink-akka.actor.default-dispatcher-2] o.a.f.r.j.MiniDispatcherRestEndpoint-[closeAsync] - 309 - Shutting down rest endpoint.
[INFO ] 2021-06-05 10:38:09.463 [flink-akka.actor.default-dispatcher-4] o.a.f.runtime.jobmaster.JobMaster-[onStop] - 395 - Stopping the JobMaster for job insert-into_default_catalog.default_database.table3(63c270e00b69eb967f59479bb1c84113).
However, flink cluster eventually shutdown after serveral restarts failed. Why my flink job eventually failed even though checkpoint is enabled and restart-strategy is set to fixed-delay?