Hi community, I have a job which read data from Datahub and sink data to Elasticsearch. The Elasticsearch frequently timeout which lead to Flink job failed and stopped, then a manually restart is needed. After investigate checkpoint strategy, I believe checkpoint can restart job automaically and avoid a manually restart when job failed. However, the job still failed and stopped when Elasticsearch timeout although I have configure checkpoint in flink-conf.yaml flink-conf.yaml state.checkpoints.dir: hdfs://172.16.1.192:9000/flink-checkpoints execution.checkpointing.interval: 10 s state.savepoints.dir: hdfs://172.16.1.192:9000/flink-savepoints restart-strategy: fixed-delay restart-strategy.fixed-delay.delay: 1 min flink log see attachment for full log. [INFO ] 2021-06-05 10:35:59.020 [flink-akka.actor.default-dispatcher-19] o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy-[getTasksNeedingRestart] - 115 - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. [INFO ] 2021-06-05 10:35:59.020 [flink-akka.actor.default-dispatcher-19] o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy-[getTasksNeedingRestart] - 152 - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. the above log shows restarted strategy works after Exception occurs. [INFO ] 2021-06-05 10:38:09.428 [flink-akka.actor.default-dispatcher-4] o.a.f.r.c.CheckpointCoordinator-[shutdown] - 405 - Stopping checkpoint coordinator for job 63c270e00b69eb967f59479bb1c84113. [INFO ] 2021-06-05 10:38:09.428 [flink-akka.actor.default-dispatcher-4] o.a.f.r.c.StandaloneCompletedCheckpointStore-[shutdown] - 96 - Shutting down [INFO ] 2021-06-05 10:38:09.451 [flink-akka.actor.default-dispatcher-2] o.a.f.r.dispatcher.MiniDispatcher-[jobReachedGloballyTerminalState] - 827 - Job 63c270e00b69eb967f59479bb1c84113 reached globally terminal state FAILED. [INFO ] 2021-06-05 10:38:09.452 [flink-akka.actor.default-dispatcher-2] o.a.f.r.dispatcher.MiniDispatcher-[jobReachedGloballyTerminalState] - 132 - Shutting down cluster with state FAILED, jobCancelled: false, executionMode: DETACHED [INFO ] 2021-06-05 10:38:09.453 [flink-akka.actor.default-dispatcher-2] o.a.f.r.entrypoint.ClusterEntrypoint-[shutDownAsync] - 481 - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics null. [INFO ] 2021-06-05 10:38:09.453 [flink-akka.actor.default-dispatcher-2] o.a.f.r.j.MiniDispatcherRestEndpoint-[closeAsync] - 309 - Shutting down rest endpoint. [INFO ] 2021-06-05 10:38:09.463 [flink-akka.actor.default-dispatcher-4] o.a.f.runtime.jobmaster.JobMaster-[onStop] - 395 - Stopping the JobMaster for job insert-into_default_catalog.default_database.table3(63c270e00b69eb967f59479bb1c84113). However, flink cluster eventually shutdown after serveral restarts failed. Why my flink job eventually failed even though checkpoint is enabled and restart-strategy is set to fixed-delay? log.txt (145K) Download Attachment |
The default
number of restart attempts is 1. You need to explicitly
configure it to allow more failures.
On 6/7/2021 11:53
AM, [hidden email] wrote:
|
Free forum by Nabble | Edit this page |