Per job cluster doesn't shut down after the job is canceled

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Per job cluster doesn't shut down after the job is canceled

Paul Lam
Hi, 

I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN cluster doesn’t shut down after the job is canceled successfully. The only errors I found in jobmanager’s log are as below (the second one appears multiple times):

```
2018-11-07 09:48:38,663 WARN  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Error while notifying JobStatusListener
java.lang.IllegalStateException: Incremented the completed number of checkpoints without incrementing the in progress checkpoints before.
	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
	at org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
	at org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-11-07 09:54:52,420 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Implementation error: Unhandled exception.
java.lang.IllegalArgumentException: Negative number of in progress checkpoints
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.<init>(CheckpointStatsCounts.java:72)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
	at org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
	at org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923)
	at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

AFAIK, it’s a know issue[1] but should not affect the cluster shutdown. Does anyone meet this problem before? Thanks a lot!


Best,
Paul Lam
Reply | Threaded
Open this post in threaded view
|

Re: Per job cluster doesn't shut down after the job is canceled

Gary Yao-2
Hi Paul,

Can you share the complete logs, or at least the logs after invoking the
cancel command?

If you want to debug it yourself, check if
MiniDispatcher#jobReachedGloballyTerminalState [1] is invoked, and see how the
jobTerminationFuture is used.

Best,
Gary

[1] https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java#L141


On Wed, Nov 7, 2018 at 3:27 AM Paul Lam <[hidden email]> wrote:
Hi, 

I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN cluster doesn’t shut down after the job is canceled successfully. The only errors I found in jobmanager’s log are as below (the second one appears multiple times):

```
2018-11-07 09:48:38,663 WARN  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Error while notifying JobStatusListener
java.lang.IllegalStateException: Incremented the completed number of checkpoints without incrementing the in progress checkpoints before.
	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
	at org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
	at org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-11-07 09:54:52,420 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Implementation error: Unhandled exception.
java.lang.IllegalArgumentException: Negative number of in progress checkpoints
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.<init>(CheckpointStatsCounts.java:72)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
	at org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
	at org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923)
	at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

AFAIK, it’s a know issue[1] but should not affect the cluster shutdown. Does anyone meet this problem before? Thanks a lot!


Best,
Paul Lam
Reply | Threaded
Open this post in threaded view
|

Re: Per job cluster doesn't shut down after the job is canceled

Paul Lam
Hi Gary,

Thanks for your reply and sorry for the delay. The attachment is the jobmanager logs after invoking the cancel command.

I think it might be related to the custom source, because the jobmanager keeps trying to trigger a checkpoint for it, 
but in fact it’s already canceled. The source implementation is using a running flag to denote it’s running, and the 
cancel method is simply setting the flag to false, which I think is a common way of implementing a custom source.
In addition, the cluster finally shut down because I killed it with yarn commands.

And also thank you for the pointer, I’ll keep tracking this problem.

Best,
Paul Lam



在 2018年11月10日,02:10,Gary Yao <[hidden email]> 写道:

Hi Paul,

Can you share the complete logs, or at least the logs after invoking the
cancel command?

If you want to debug it yourself, check if
MiniDispatcher#jobReachedGloballyTerminalState [1] is invoked, and see how the
jobTerminationFuture is used.

Best,
Gary

[1] https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java#L141


On Wed, Nov 7, 2018 at 3:27 AM Paul Lam <[hidden email]> wrote:
Hi, 

I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN cluster doesn’t shut down after the job is canceled successfully. The only errors I found in jobmanager’s log are as below (the second one appears multiple times):

```
2018-11-07 09:48:38,663 WARN  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Error while notifying JobStatusListener
java.lang.IllegalStateException: Incremented the completed number of checkpoints without incrementing the in progress checkpoints before.
	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
	at org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
	at org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-11-07 09:54:52,420 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Implementation error: Unhandled exception.
java.lang.IllegalArgumentException: Negative number of in progress checkpoints
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.<init>(CheckpointStatsCounts.java:72)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
	at org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
	at org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923)
	at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

AFAIK, it’s a know issue[1] but should not affect the cluster shutdown. Does anyone meet this problem before? Thanks a lot!


Best,
Paul Lam


failed_to_release_resource.log (364K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Per job cluster doesn't shut down after the job is canceled

Ufuk Celebi-2
Hey Paul,

It might be related to this: https://github.com/apache/flink/pull/7004 (see linked issue for details).

Best,

Ufuk

> On Nov 14, 2018, at 09:46, Paul Lam <[hidden email]> wrote:
>
> Hi Gary,
>
> Thanks for your reply and sorry for the delay. The attachment is the jobmanager logs after invoking the cancel command.
>
> I think it might be related to the custom source, because the jobmanager keeps trying to trigger a checkpoint for it,
> but in fact it’s already canceled. The source implementation is using a running flag to denote it’s running, and the
> cancel method is simply setting the flag to false, which I think is a common way of implementing a custom source.
> In addition, the cluster finally shut down because I killed it with yarn commands.
>
> And also thank you for the pointer, I’ll keep tracking this problem.
>
> Best,
> Paul Lam
>
> <failed_to_release_resource.log>
>
>> 在 2018年11月10日,02:10,Gary Yao <[hidden email]> 写道:
>>
>> Hi Paul,
>>
>> Can you share the complete logs, or at least the logs after invoking the
>> cancel command?
>>
>> If you want to debug it yourself, check if
>> MiniDispatcher#jobReachedGloballyTerminalState [1] is invoked, and see how the
>> jobTerminationFuture is used.
>>
>> Best,
>> Gary
>>
>> [1] https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java#L141
>>
>>
>> On Wed, Nov 7, 2018 at 3:27 AM Paul Lam <[hidden email]> wrote:
>> Hi,
>>
>> I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN cluster doesn’t shut down after the job is canceled successfully. The only errors I found in jobmanager’s log are as below (the second one appears multiple times):
>>
>> ```
>> 2018-11-07 09:48:38,663 WARN  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Error while notifying JobStatusListener
>> java.lang.IllegalStateException: Incremented the completed number of checkpoints without incrementing the in progress checkpoints before.
>> at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
>> at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
>> at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
>> at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
>> at org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
>> at org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
>> at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
>> at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
>> at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
>> at org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> 2018-11-07 09:54:52,420 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Implementation error: Unhandled exception.
>> java.lang.IllegalArgumentException: Negative number of in progress checkpoints
>> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>> at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.<init>(CheckpointStatsCounts.java:72)
>> at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
>> at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
>> at org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
>> at org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923)
>> at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> ```
>>
>> AFAIK, it’s a know issue[1] but should not affect the cluster shutdown. Does anyone meet this problem before? Thanks a lot!
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10482
>>
>> Best,
>> Paul Lam
>

Reply | Threaded
Open this post in threaded view
|

Re: Per job cluster doesn't shut down after the job is canceled

Paul Lam
Hi Ufuk,

Thanks for you reply!

I’m afraid that my case is different. Since the Flink on YARN application is not exited, we do not
have an application exit code yet (but the job status is determined).

Best,
Paul Lam


> 在 2018年11月14日,16:49,Ufuk Celebi <[hidden email]> 写道:
>
> Hey Paul,
>
> It might be related to this: https://github.com/apache/flink/pull/7004 (see linked issue for details).
>
> Best,
>
> Ufuk
>
>> On Nov 14, 2018, at 09:46, Paul Lam <[hidden email]> wrote:
>>
>> Hi Gary,
>>
>> Thanks for your reply and sorry for the delay. The attachment is the jobmanager logs after invoking the cancel command.
>>
>> I think it might be related to the custom source, because the jobmanager keeps trying to trigger a checkpoint for it,
>> but in fact it’s already canceled. The source implementation is using a running flag to denote it’s running, and the
>> cancel method is simply setting the flag to false, which I think is a common way of implementing a custom source.
>> In addition, the cluster finally shut down because I killed it with yarn commands.
>>
>> And also thank you for the pointer, I’ll keep tracking this problem.
>>
>> Best,
>> Paul Lam
>>
>> <failed_to_release_resource.log>
>>
>>> 在 2018年11月10日,02:10,Gary Yao <[hidden email]> 写道:
>>>
>>> Hi Paul,
>>>
>>> Can you share the complete logs, or at least the logs after invoking the
>>> cancel command?
>>>
>>> If you want to debug it yourself, check if
>>> MiniDispatcher#jobReachedGloballyTerminalState [1] is invoked, and see how the
>>> jobTerminationFuture is used.
>>>
>>> Best,
>>> Gary
>>>
>>> [1] https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java#L141
>>>
>>>
>>> On Wed, Nov 7, 2018 at 3:27 AM Paul Lam <[hidden email]> wrote:
>>> Hi,
>>>
>>> I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN cluster doesn’t shut down after the job is canceled successfully. The only errors I found in jobmanager’s log are as below (the second one appears multiple times):
>>>
>>> ```
>>> 2018-11-07 09:48:38,663 WARN  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Error while notifying JobStatusListener
>>> java.lang.IllegalStateException: Incremented the completed number of checkpoints without incrementing the in progress checkpoints before.
>>> at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
>>> at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
>>> at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
>>> at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
>>> at org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
>>> at org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
>>> at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
>>> at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
>>> at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>> 2018-11-07 09:54:52,420 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Implementation error: Unhandled exception.
>>> java.lang.IllegalArgumentException: Negative number of in progress checkpoints
>>> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>>> at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.<init>(CheckpointStatsCounts.java:72)
>>> at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
>>> at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
>>> at org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923)
>>> at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> ```
>>>
>>> AFAIK, it’s a know issue[1] but should not affect the cluster shutdown. Does anyone meet this problem before? Thanks a lot!
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-10482
>>>
>>> Best,
>>> Paul Lam
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Per job cluster doesn't shut down after the job is canceled

Gary Yao-2
In reply to this post by Paul Lam
Hi Paul,

Sorry for the late reply. I had a look at the attached log. I think
FLINK-10482 affects the shut down of the "per-job cluster" after all. Here is
the respective stacktrace:

2018-11-06 10:45:17,405 ERROR org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor          - Caught exception while executing runnable in main thread.
java.lang.IllegalArgumentException: Negative number of in progress checkpoints
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
    at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.<init>(CheckpointStatsCounts.java:72)
    at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
    at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
    at org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
    at org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1247)
    at org.apache.flink.runtime.jobmaster.JobMaster.access$1600(JobMaster.java:147)
    at org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1590)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
    at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

We try to create an ArchivedExecutionGraph, which fails because we cannot
snapshot the checkpoint statistics. The subsequent code that should ultimately
shut down the cluster is not executed [1]. If you can tell us how you run into
the "Negative number of in progress checkpoints" problem, we might be able to
come up with a mitigation until FLINK-10482 is fixed.

Best,
Gary

[1] https://github.com/apache/flink/blob/614f2162e42345da7501f8f6ea724a7e0ce65e3c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1247-L1248

On Wed, Nov 14, 2018 at 9:46 AM Paul Lam <[hidden email]> wrote:
Hi Gary,

Thanks for your reply and sorry for the delay. The attachment is the jobmanager logs after invoking the cancel command.

I think it might be related to the custom source, because the jobmanager keeps trying to trigger a checkpoint for it, 
but in fact it’s already canceled. The source implementation is using a running flag to denote it’s running, and the 
cancel method is simply setting the flag to false, which I think is a common way of implementing a custom source.
In addition, the cluster finally shut down because I killed it with yarn commands.

And also thank you for the pointer, I’ll keep tracking this problem.

Best,
Paul Lam


在 2018年11月10日,02:10,Gary Yao <[hidden email]> 写道:

Hi Paul,

Can you share the complete logs, or at least the logs after invoking the
cancel command?

If you want to debug it yourself, check if
MiniDispatcher#jobReachedGloballyTerminalState [1] is invoked, and see how the
jobTerminationFuture is used.

Best,
Gary

[1] https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java#L141


On Wed, Nov 7, 2018 at 3:27 AM Paul Lam <[hidden email]> wrote:
Hi, 

I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN cluster doesn’t shut down after the job is canceled successfully. The only errors I found in jobmanager’s log are as below (the second one appears multiple times):

```
2018-11-07 09:48:38,663 WARN  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Error while notifying JobStatusListener
java.lang.IllegalStateException: Incremented the completed number of checkpoints without incrementing the in progress checkpoints before.
	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
	at org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
	at org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-11-07 09:54:52,420 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Implementation error: Unhandled exception.
java.lang.IllegalArgumentException: Negative number of in progress checkpoints
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.<init>(CheckpointStatsCounts.java:72)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
	at org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
	at org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923)
	at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

AFAIK, it’s a know issue[1] but should not affect the cluster shutdown. Does anyone meet this problem before? Thanks a lot!


Best,
Paul Lam