Hi,
I've been experiencing an issue that the client frontend is likely unable to get cancel-with-savepoint responses from JobManager sometimes. I’m using Flink 1.5.3 on YARN cluster, and when I execute cancel with savepoint command (FsStataBackend backed by HDFS), normally it finishes in seconds, but sometimes it takes unexpectedly long and ends up with` RetryException: Could not complete the operation. Number of retries has been exhausted.` However, the server side logs are just like normal, the savepoint is stored fine, and the job is also canceled. Haven’t looked into the codes yet, but I suspect the response was lost but the JobManager was still shutting down, so when the client retries it can’t get any response, and finally gets connection refused exception after the shutdown completes (the logs are at the end of the mail). Does my opinion make sense? And how should I debug or fix this problem? Any help would be appreciated, thank you very much! ``` client side Commit Succeeded org.apache.flink.util.FlinkException: Could not cancel job e3317308798e962270b95102a1afe3b7. at org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960) at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398) at org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583) ... 9 more Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: java.net.ConnectException: Connection refused: 10.191.56.77:27597 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 16 more Caused by: java.net.ConnectException: Connection refused: 10.191.56.77:27597 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281) ... 7 more ``` ``` server side (skipped some debug level logs) 2018-09-13 18:00:51,410 INFO org.apache.flink.runtime.jobmaster.JobMaster - Savepoint stored in hdfs://horton/flink/horton/savepoints/e3317308798e962270b95102a1afe3b7/be7e952cff5344658dab4a8376d64742/savepoint-e33173-62bbcb9eb8a5. Now cancelling e3317308798e962270b95102a1afe3b7. 2018-09-13 18:00:51,410 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint state: OperatorState(operatorID: feca28aff5a3958840bee985ee7de4d3, parallelism: 2, maxParallelism: 128, sub task states: 2, total size (bytes): 11488) 2018-09-13 18:00:51,411 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job LogStream-sigma-panama-panama_h48_serverlog_testing (e3317308798e962270b95102a1afe3b7) switched from state RUNNING to CANCELLING. 2018-09-13 18:00:51,411 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from RUNNING to CANCELING. 2018-09-13 18:00:51,411 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from RUNNING to CANCELING. 2018-09-13 18:00:51,411 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from RUNNING to CANCELING. 2018-09-13 18:00:51,411 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from RUNNING to CANCELING. 2018-09-13 18:00:51,411 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from RUNNING to CANCELING. 2018-09-13 18:00:51,411 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from RUNNING to CANCELING. 2018-09-13 18:00:51,419 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from CANCELING to CANCELED. 2018-09-13 18:00:51,419 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Source: Custom Source (2/2) - execution #1 to FAILED while being CANCELED. 2018-09-13 18:00:51,419 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing slot [SlotRequestId{bc044e1b2a4f4560517001551b164afc}] because: Slot is being returned to the SlotPool. 2018-09-13 18:00:51,419 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from CANCELING to CANCELED. 2018-09-13 18:00:51,419 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Source: KafkaSource (1/2) - execution #1 to FAILED while being CANCELED. 2018-09-13 18:00:51,419 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing slot [SlotRequestId{d992569e383e1d991c7b4434510e1bde}] because: Slot is being returned to the SlotPool. 2018-09-13 18:00:51,420 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from CANCELING to CANCELED. 2018-09-13 18:00:51,420 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Source: Custom Source (1/2) - execution #1 to FAILED while being CANCELED. 2018-09-13 18:00:51,420 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing slot [SlotRequestId{76ec5acfc56cdfccb7af720fc31af31a}] because: Slot is being returned to the SlotPool. 2018-09-13 18:00:51,420 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from CANCELING to CANCELED. 2018-09-13 18:00:51,421 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Source: KafkaSource (2/2) - execution #1 to FAILED while being CANCELED. 2018-09-13 18:00:51,421 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing slot [SlotRequestId{176fa66fe26049ead6813fcb313ac7e9}] because: Slot is being returned to the SlotPool. 2018-09-13 18:00:51,425 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from CANCELING to CANCELED. 2018-09-13 18:00:51,425 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: LogKafkaSink (1/2) - execution #1 to FAILED while being CANCELED. 2018-09-13 18:00:51,425 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing slot [SlotRequestId{4ee2fdd667af29402f86ef0afa090e25}] because: Slot is being returned to the SlotPool. 2018-09-13 18:00:51,425 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing slot [SlotRequestId{301a0fd32ef0f4e0001973cc96f05526}] because: Release multi task slot because all children have been released. 2018-09-13 18:00:51,425 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Adding returned slot [AllocationID{e64791bd80fb2c37b9ee1448466330e1}] to available slots 2018-09-13 18:00:51,425 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from CANCELING to CANCELED. 2018-09-13 18:00:51,425 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: LogKafkaSink (2/2) - execution #1 to FAILED while being CANCELED. 2018-09-13 18:00:51,425 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing slot [SlotRequestId{7c3e3b9fb6f6ea9b20aa63bf6af5b12e}] because: Slot is being returned to the SlotPool. 2018-09-13 18:00:51,425 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job LogStream-sigma-panama-panama_h48_serverlog_testing (e3317308798e962270b95102a1afe3b7) switched from state CANCELLING to CANCELED. 2018-09-13 18:00:51,425 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job e3317308798e962270b95102a1afe3b7. 2018-09-13 18:00:51,426 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Shutting down 2018-09-13 18:00:51,428 INFO org.apache.flink.runtime.dispatcher.MiniDispatcher - Job e3317308798e962270b95102a1afe3b7 reached globally terminal state CANCELED. 2018-09-13 18:00:51,769 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shut down and terminate YarnJobClusterEntrypoint with return code 1444 and application status CANCELED. 2018-09-13 18:00:51,769 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Stopping YarnJobClusterEntrypoint. 2018-09-13 18:00:51,769 INFO org.apache.flink.yarn.YarnResourceManager - Shut down cluster because application is in CANCELED, diagnostics null. 2018-09-13 18:00:51,769 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. 2018-09-13 18:00:51,770 INFO org.apache.flink.yarn.YarnResourceManager - Unregister application from the YARN Resource Manager with final status KILLED. 2018-09-13 18:00:51,775 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Waiting for application to be successfully unregistered. 2018-09-13 18:00:51,789 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending SlotPool. 2018-09-13 18:00:51,789 DEBUG org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 5eeea30d3582d0bfba2d0ec58f4da18c. org.apache.flink.util.FlinkException: JobManager is shutting down. at org.apache.flink.runtime.jobmaster.JobMaster.postStop(JobMaster.java:365) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.postStop(FencedAkkaRpcActor.java:40) at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:374) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2018-09-13 18:00:51,789 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcActor - The rpc endpoint org.apache.flink.runtime.jobmaster.slotpool.SlotPool has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalRpcInvocation until processing is started. 2018-09-13 18:00:51,794 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping SlotPool. 2018-09-13 18:00:52,890 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting 2018-09-13 18:00:52,922 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Session: 0x16324f962f69800 closed 2018-09-13 18:00:52,923 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x16324f962f69800 2018-09-13 18:00:52,923 INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache 2018-09-13 18:00:52,932 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service. 2018-09-13 18:00:52,935 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon. 2018-09-13 18:00:52,936 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports. 2018-09-13 18:00:52,960 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down. ``` Best, Paul Lam |
Hi, Paul
|
Hi Devin,
Thanks for the reply! It seems like I missed an important thread. @vino mentioned a solution that is splitting the cancel-with-savepoint operation into two separated operations, and I wonder if it breaks the end to end exactly-once semantics in case of a at-least-once sink? Thanks a lot! Best, Paul Lam
|
Hi Paul, It does not affect anything. It simply splits the two operations that can be connected together into two separate operations. The cancel operation will not be triggered until the savepoint operation is completed. Thanks, vino. Paul Lam <[hidden email]> 于2018年9月14日周五 上午11:12写道:
|
Hi vino,
Thank you for the helpful information! One more question, are these operations supposed to run concurrently to ensure JobManager receives the cancel request before the savepoint is completed? Best, Paul lam
|
Hi Paul, As far as writing programs are they are a few lines of code: CompletableFuture<String> savepointPath = client.triggerSavepoint(); savepointPath.get(); //block until the savepoint completed client.cancel(); Please note that this is just an example, not a real program, and there may be deviations. Thanks, vino. Paul Lam <[hidden email]> 于2018年9月14日周五 下午12:26写道:
|
In reply to this post by Paul Lam
Hi Paul, you're analysis is right. The JobManager does not wait for pending operation results to be properly served. See https://issues.apache.org/jira/browse/FLINK-10309 for more details. I think a way to solve it is to wait for some time if the RestServerEndpoint still has some responses to serve. Cheers, Till On Fri, Sep 14, 2018 at 4:41 AM Paul Lam <[hidden email]> wrote: Hi, |
Free forum by Nabble | Edit this page |