expected behavior when Flink job cluster exhausted all restarts

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

expected behavior when Flink job cluster exhausted all restarts

Eleanore Jin
Hi experts,
I am running a flink job cluster, the application jar is packaged together with flink in a docker image. The flink job cluster is running in kubernetes, the restart strategy is below

restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 20
restart-strategy.failure-rate.failure-rate-interval: 3 min
restart-strategy.failure-rate.delay: 100 ms

The job manager is not setup in HA mode, so only 1 pod. 

What I have observed is the job manager pod has restarted a few times, and when it restarts, it will start as a new flink job (hence a new flink job id), so it seems it could not restart from the last successful checkpoint, highlighted in yellow is what the evidence. 

So I wonder in this case, should I set the flink job as a fixed value? (if there is a way to set it), or should I set the restart strategy to retry infinite? Or something else I should do?

Thanks a lot!
Eleanore

{"@timestamp":"2020-10-21T09:45:30.571Z","@version":"1","message":"1 tasks should be restarted to recover the failed task 6c62b269f2830c09ffe62c59c9f52d9c_19. ","logger_name":"org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}


{"@timestamp":"2020-10-21T09:45:30.572Z","@version":"1","message":"Job 9ikvi0743v9rkayb1qof (0b4c1ed9cd2cb47ee99ddb173a9beee5) switched from state RESTARTING to FAILING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000,"stack_trace":"org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: Could not submit task because there is no JobManager associated for the job 0b4c1ed9cd2cb47ee99ddb173a9beee5.\n\tat org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:475)

Reply | Threaded
Open this post in threaded view
|

Re: expected behavior when Flink job cluster exhausted all restarts

Till Rohrmann
Hi Eleanore,

if you want to tolerate JM restarts, then you have to enable HA. W/o HA, a JM restart is effectively a submission of a new job.

In order to tell you more about the Task submission rejection by the TaskExecutor, I would need to take a look at the logs of the JM and the rejecting TaskExecutor. Moreover, which Flink version are you using?

Cheers,
Till

On Wed, Oct 21, 2020 at 6:53 PM Eleanore Jin <[hidden email]> wrote:
Hi experts,
I am running a flink job cluster, the application jar is packaged together with flink in a docker image. The flink job cluster is running in kubernetes, the restart strategy is below

restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 20
restart-strategy.failure-rate.failure-rate-interval: 3 min
restart-strategy.failure-rate.delay: 100 ms

The job manager is not setup in HA mode, so only 1 pod. 

What I have observed is the job manager pod has restarted a few times, and when it restarts, it will start as a new flink job (hence a new flink job id), so it seems it could not restart from the last successful checkpoint, highlighted in yellow is what the evidence. 

So I wonder in this case, should I set the flink job as a fixed value? (if there is a way to set it), or should I set the restart strategy to retry infinite? Or something else I should do?

Thanks a lot!
Eleanore

{"@timestamp":"2020-10-21T09:45:30.571Z","@version":"1","message":"1 tasks should be restarted to recover the failed task 6c62b269f2830c09ffe62c59c9f52d9c_19. ","logger_name":"org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}


{"@timestamp":"2020-10-21T09:45:30.572Z","@version":"1","message":"Job 9ikvi0743v9rkayb1qof (0b4c1ed9cd2cb47ee99ddb173a9beee5) switched from state RESTARTING to FAILING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000,"stack_trace":"org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: Could not submit task because there is no JobManager associated for the job 0b4c1ed9cd2cb47ee99ddb173a9beee5.\n\tat org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:475)

Reply | Threaded
Open this post in threaded view
|

Re: expected behavior when Flink job cluster exhausted all restarts

Eleanore Jin
Hi Till, 

thanks a lot for the explanation. Im using Flink 1.10.2 with java 11.

Thanks!
Eleanore

On Fri, Oct 23, 2020 at 4:31 AM Till Rohrmann <[hidden email]> wrote:
Hi Eleanore,

if you want to tolerate JM restarts, then you have to enable HA. W/o HA, a JM restart is effectively a submission of a new job.

In order to tell you more about the Task submission rejection by the TaskExecutor, I would need to take a look at the logs of the JM and the rejecting TaskExecutor. Moreover, which Flink version are you using?

Cheers,
Till

On Wed, Oct 21, 2020 at 6:53 PM Eleanore Jin <[hidden email]> wrote:
Hi experts,
I am running a flink job cluster, the application jar is packaged together with flink in a docker image. The flink job cluster is running in kubernetes, the restart strategy is below

restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 20
restart-strategy.failure-rate.failure-rate-interval: 3 min
restart-strategy.failure-rate.delay: 100 ms

The job manager is not setup in HA mode, so only 1 pod. 

What I have observed is the job manager pod has restarted a few times, and when it restarts, it will start as a new flink job (hence a new flink job id), so it seems it could not restart from the last successful checkpoint, highlighted in yellow is what the evidence. 

So I wonder in this case, should I set the flink job as a fixed value? (if there is a way to set it), or should I set the restart strategy to retry infinite? Or something else I should do?

Thanks a lot!
Eleanore

{"@timestamp":"2020-10-21T09:45:30.571Z","@version":"1","message":"1 tasks should be restarted to recover the failed task 6c62b269f2830c09ffe62c59c9f52d9c_19. ","logger_name":"org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}


{"@timestamp":"2020-10-21T09:45:30.572Z","@version":"1","message":"Job 9ikvi0743v9rkayb1qof (0b4c1ed9cd2cb47ee99ddb173a9beee5) switched from state RESTARTING to FAILING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000,"stack_trace":"org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: Could not submit task because there is no JobManager associated for the job 0b4c1ed9cd2cb47ee99ddb173a9beee5.\n\tat org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:475)