Cancelling job with savepoint fails sometimes

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Cancelling job with savepoint fails sometimes

James Isaac
I notice that sometimes when I try to cancel a Flink job with savepoint, the cancel fails with the following error:

org.apache.flink.util.FlinkException: Could not cancel job 3be3d380dca9bb6a5cf0d559d54d7ff8.
        at org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:581)
        at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:955)
        at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:573)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1029)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: Not all required tasks are currently running.
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:385)
        at org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:579)
        ... 6 more
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: Not all required tasks are currently running.
        at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:959)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:884)
        at java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2196)
        at org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:955)
        at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: Not all required tasks are currently running.
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
        at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
        at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
        at org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:947)
        ... 20 more
Caused by: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: Not all required tasks are currently running.
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:377)
        at org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:946)
        ... 20 more


Also, I see the following lines in the JobManager logs:
2018-07-11 05:41:13,316 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source -> Flat Map (1/3) of job e691fa002c682703735afb178ce6ba37 is not being executed at the moment. Aborting checkpoint.
2018-07-11 05:41:13,517 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source -> Flat Map (1/3) of job e691fa002c682703735afb178ce6ba37 is not being executed at the moment. Aborting checkpoint.
2018-07-11 05:41:13,716 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source -> Flat Map (1/3) of job e691fa002c682703735afb178ce6ba37 is not being executed at the moment. Aborting checkpoint.

Retrying the cancel at this point doesn't help. It keeps failing with the same error till the job runs to completion.
Note that this issue happens intermittently, not always.

Do I need to do anything in particular in my application source and sink checkpointing code? Have I forgotten to take care of something? I am using flink-1.5.0.

I came across a similar issue here, but I don't see any updates: 

Regards,
James
Reply | Threaded
Open this post in threaded view
|

Re: Cancelling job with savepoint fails sometimes

Chesnay Schepler
My guess is that this is related to
https://issues.apache.org/jira/browse/FLINK-2491.

The relevant bit is "Failed to trigger savepoint. Decline reason: Not
all required tasks are currently running."

So, if one task has already finished (for example a source with a small
finite input) then the savepoint cannot be taken. The same may apply if
a task is currently restarting, failing etc. .

On 11.07.2018 09:53, Data Engineer wrote:

> I notice that sometimes when I try to cancel a Flink job with
> savepoint, the cancel fails with the following error:
>
> org.apache.flink.util.FlinkException: Could not cancel job
> 3be3d380dca9bb6a5cf0d559d54d7ff8.
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:581)
>         at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:955)
>         at
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:573)
>         at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1029)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
>         at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
> Caused by: java.util.concurrent.ExecutionException:
> java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed
> to trigger savepoint. Decline reason: Not all required tasks are
> currently running.
>         at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at
> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:385)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:579)
>         ... 6 more
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed
> to trigger savepoint. Decline reason: Not all required tasks are
> currently running.
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:959)
>         at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>         at
> java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:884)
>         at
> java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2196)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:955)
>         at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>         at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed
> to trigger savepoint. Decline reason: Not all required tasks are
> currently running.
>         at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>         at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>         at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>         at
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
>         at
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:947)
>         ... 20 more
> Caused by:
> org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed
> to trigger savepoint. Decline reason: Not all required tasks are
> currently running.
>         at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:377)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:946)
>         ... 20 more
>
>
> Also, I see the following lines in the JobManager logs:
> 2018-07-11 05:41:13,316 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  -
> Checkpoint triggering task Source: Custom Source -> Flat Map (1/3) of
> job e691fa002c682703735afb178ce6ba37 is not being executed at the
> moment. Aborting checkpoint.
> 2018-07-11 05:41:13,517 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  -
> Checkpoint triggering task Source: Custom Source -> Flat Map (1/3) of
> job e691fa002c682703735afb178ce6ba37 is not being executed at the
> moment. Aborting checkpoint.
> 2018-07-11 05:41:13,716 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  -
> Checkpoint triggering task Source: Custom Source -> Flat Map (1/3) of
> job e691fa002c682703735afb178ce6ba37 is not being executed at the
> moment. Aborting checkpoint.
>
> Retrying the cancel at this point doesn't help. It keeps failing with
> the same error till the job runs to completion.
> Note that this issue happens intermittently, not always.
>
> Do I need to do anything in particular in my application source and
> sink checkpointing code? Have I forgotten to take care of something? I
> am using flink-1.5.0.
>
> I came across a similar issue here, but I don't see any updates:
> http://mail-archives.apache.org/mod_mbox/flink-issues/201706.mbox/%3CJIRA.13082229.1498251834000.96074.1498260420026@...%3E
>
> Regards,
> James