Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint

ChangZhuo Chen (陳昌倬)
Hi,

We run our application in Flink 1.13.0, Kubernetes standalone
application cluster with reactive mode enabled. The application has
stopped and cannot restore today, so we try to restore the application
from checkpoint. However, the application cannot restart from checkpoint
due to the following error. We have no idea the meaning of this
exception, so any help is welcome.


2021-05-14 01:55:37,204 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Close ResourceManager connection 06d772aae2ab4afb8c6917dac40cd727: Stopping JobMaster for job rt-match_11.2.16_5d671ba3(00000000000000000000000000000000)..
2021-05-14 01:55:37,205 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-05-14 01:55:37,205 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='rt-match-flink-dev-resourcemanager-leader'}.
2021-05-14 01:55:37,205 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] - The watcher is closing.
2021-05-14 01:55:37,215 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-05-14 01:55:37,215 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver [] - Closing KubernetesLeaderElectionDriver{configMapName='rt-match-flink-dev-00000000000000000000000000000000-jobmanager-leader'}.
2021-05-14 01:55:37,216 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] - The watcher is closing.
2021-05-14 01:55:37,342 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed job graph 00000000000000000000000000000000 from KubernetesStateHandleStore{configMapName='rt-match-flink-dev-dispatcher-leader'}.
2021-05-14 01:55:37,914 INFO  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application FAILED:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$4(ApplicationDispatcherBootstrap.java:304) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680) ~[?:?]
        at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658) ~[?:?]
        at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094) ~[?:?]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.unwrapJobResultException(ApplicationDispatcherBootstrap.java:297) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$getApplicationResult$3(ApplicationDispatcherBootstrap.java:270) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) ~[?:?]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) ~[?:?]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.getApplicationResult(ApplicationDispatcherBootstrap.java:272) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:221) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:254) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.0.jar:1.13.0]
Caused by: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED
        at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:71) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        ... 29 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:60) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        ... 29 more
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$switchExecutor$24(FutureUtils.java:1377) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        ... 4 more
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
        at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:59) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
        at org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:986) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$25(AdaptiveScheduler.java:976) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
2021-05-14 01:55:37,916 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneApplicationClusterEntryPoint down with application status FAILED. Diagnostics null.
2021-05-14 01:55:37,917 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint

Till Rohrmann
Hi ChangZhuo Chen,

This looks like a bug in Flink. Could you provide us with the logs of the run and more information about your job? In particular, how does your topology look like?

My suspicion is the following: You have an operator with two inputs. One input is keyed whereas the other input is something else. Due to this property, the JobVertex has two different SubtaskStateMappers assigned which produce different subtask mappings in the TaskStateAssignment. This is the exception you are observing. The initial deployment works because you don't have any state to recover. However, subsequent recoveries should fail. I am also pulling in Arvid who worked on the subtask state assignment recently and might be able to shed some more light on this matter.

Cheers,
Till

On Fri, May 14, 2021 at 4:35 AM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
Hi,

We run our application in Flink 1.13.0, Kubernetes standalone
application cluster with reactive mode enabled. The application has
stopped and cannot restore today, so we try to restore the application
from checkpoint. However, the application cannot restart from checkpoint
due to the following error. We have no idea the meaning of this
exception, so any help is welcome.


2021-05-14 01:55:37,204 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Close ResourceManager connection 06d772aae2ab4afb8c6917dac40cd727: Stopping JobMaster for job rt-match_11.2.16_5d671ba3(00000000000000000000000000000000)..
2021-05-14 01:55:37,205 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-05-14 01:55:37,205 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='rt-match-flink-dev-resourcemanager-leader'}.
2021-05-14 01:55:37,205 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] - The watcher is closing.
2021-05-14 01:55:37,215 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-05-14 01:55:37,215 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver [] - Closing KubernetesLeaderElectionDriver{configMapName='rt-match-flink-dev-00000000000000000000000000000000-jobmanager-leader'}.
2021-05-14 01:55:37,216 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] - The watcher is closing.
2021-05-14 01:55:37,342 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed job graph 00000000000000000000000000000000 from KubernetesStateHandleStore{configMapName='rt-match-flink-dev-dispatcher-leader'}.
2021-05-14 01:55:37,914 INFO  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application FAILED:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$4(ApplicationDispatcherBootstrap.java:304) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680) ~[?:?]
        at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658) ~[?:?]
        at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094) ~[?:?]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.unwrapJobResultException(ApplicationDispatcherBootstrap.java:297) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$getApplicationResult$3(ApplicationDispatcherBootstrap.java:270) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) ~[?:?]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) ~[?:?]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.getApplicationResult(ApplicationDispatcherBootstrap.java:272) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:221) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:254) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.0.jar:1.13.0]
Caused by: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED
        at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:71) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        ... 29 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:60) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        ... 29 more
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$switchExecutor$24(FutureUtils.java:1377) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        ... 4 more
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
        at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:59) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
        at org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:986) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$25(AdaptiveScheduler.java:976) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
2021-05-14 01:55:37,916 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneApplicationClusterEntryPoint down with application status FAILED. Diagnostics null.
2021-05-14 01:55:37,917 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint

Till Rohrmann
One small addition: The old mapping looks to use the SubtaskStateMapper.RANGE whereas the new mapping looks to use the SubtaskStateMapper.ROUND_ROBIN.

On Mon, May 17, 2021 at 11:56 AM Till Rohrmann <[hidden email]> wrote:
Hi ChangZhuo Chen,

This looks like a bug in Flink. Could you provide us with the logs of the run and more information about your job? In particular, how does your topology look like?

My suspicion is the following: You have an operator with two inputs. One input is keyed whereas the other input is something else. Due to this property, the JobVertex has two different SubtaskStateMappers assigned which produce different subtask mappings in the TaskStateAssignment. This is the exception you are observing. The initial deployment works because you don't have any state to recover. However, subsequent recoveries should fail. I am also pulling in Arvid who worked on the subtask state assignment recently and might be able to shed some more light on this matter.

Cheers,
Till

On Fri, May 14, 2021 at 4:35 AM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
Hi,

We run our application in Flink 1.13.0, Kubernetes standalone
application cluster with reactive mode enabled. The application has
stopped and cannot restore today, so we try to restore the application
from checkpoint. However, the application cannot restart from checkpoint
due to the following error. We have no idea the meaning of this
exception, so any help is welcome.


2021-05-14 01:55:37,204 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Close ResourceManager connection 06d772aae2ab4afb8c6917dac40cd727: Stopping JobMaster for job rt-match_11.2.16_5d671ba3(00000000000000000000000000000000)..
2021-05-14 01:55:37,205 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-05-14 01:55:37,205 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='rt-match-flink-dev-resourcemanager-leader'}.
2021-05-14 01:55:37,205 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] - The watcher is closing.
2021-05-14 01:55:37,215 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-05-14 01:55:37,215 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver [] - Closing KubernetesLeaderElectionDriver{configMapName='rt-match-flink-dev-00000000000000000000000000000000-jobmanager-leader'}.
2021-05-14 01:55:37,216 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] - The watcher is closing.
2021-05-14 01:55:37,342 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed job graph 00000000000000000000000000000000 from KubernetesStateHandleStore{configMapName='rt-match-flink-dev-dispatcher-leader'}.
2021-05-14 01:55:37,914 INFO  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application FAILED:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$4(ApplicationDispatcherBootstrap.java:304) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680) ~[?:?]
        at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658) ~[?:?]
        at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094) ~[?:?]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.unwrapJobResultException(ApplicationDispatcherBootstrap.java:297) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$getApplicationResult$3(ApplicationDispatcherBootstrap.java:270) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) ~[?:?]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) ~[?:?]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.getApplicationResult(ApplicationDispatcherBootstrap.java:272) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:221) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:254) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.0.jar:1.13.0]
Caused by: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED
        at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:71) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        ... 29 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:60) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        ... 29 more
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$switchExecutor$24(FutureUtils.java:1377) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        ... 4 more
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
        at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:59) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
        at org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:986) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$25(AdaptiveScheduler.java:976) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
2021-05-14 01:55:37,916 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneApplicationClusterEntryPoint down with application status FAILED. Diagnostics null.
2021-05-14 01:55:37,917 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint

Arvid Heise-4
Hi ChangZhuo,

This looks indeed like a bug. I created FLINK-22686 [1] to track it. It looks unrelated to reactive mode to me and more related to unaligned checkpoints. So, you can try out reactive mode with aligned checkpoints.

If you can provide us with the topology, we can also fix it soonish: 1.13.1 is on the horizon because of a license issue.


On Mon, May 17, 2021 at 12:00 PM Till Rohrmann <[hidden email]> wrote:
One small addition: The old mapping looks to use the SubtaskStateMapper.RANGE whereas the new mapping looks to use the SubtaskStateMapper.ROUND_ROBIN.

On Mon, May 17, 2021 at 11:56 AM Till Rohrmann <[hidden email]> wrote:
Hi ChangZhuo Chen,

This looks like a bug in Flink. Could you provide us with the logs of the run and more information about your job? In particular, how does your topology look like?

My suspicion is the following: You have an operator with two inputs. One input is keyed whereas the other input is something else. Due to this property, the JobVertex has two different SubtaskStateMappers assigned which produce different subtask mappings in the TaskStateAssignment. This is the exception you are observing. The initial deployment works because you don't have any state to recover. However, subsequent recoveries should fail. I am also pulling in Arvid who worked on the subtask state assignment recently and might be able to shed some more light on this matter.

Cheers,
Till

On Fri, May 14, 2021 at 4:35 AM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
Hi,

We run our application in Flink 1.13.0, Kubernetes standalone
application cluster with reactive mode enabled. The application has
stopped and cannot restore today, so we try to restore the application
from checkpoint. However, the application cannot restart from checkpoint
due to the following error. We have no idea the meaning of this
exception, so any help is welcome.


2021-05-14 01:55:37,204 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Close ResourceManager connection 06d772aae2ab4afb8c6917dac40cd727: Stopping JobMaster for job rt-match_11.2.16_5d671ba3(00000000000000000000000000000000)..
2021-05-14 01:55:37,205 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-05-14 01:55:37,205 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='rt-match-flink-dev-resourcemanager-leader'}.
2021-05-14 01:55:37,205 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] - The watcher is closing.
2021-05-14 01:55:37,215 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-05-14 01:55:37,215 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver [] - Closing KubernetesLeaderElectionDriver{configMapName='rt-match-flink-dev-00000000000000000000000000000000-jobmanager-leader'}.
2021-05-14 01:55:37,216 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] - The watcher is closing.
2021-05-14 01:55:37,342 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed job graph 00000000000000000000000000000000 from KubernetesStateHandleStore{configMapName='rt-match-flink-dev-dispatcher-leader'}.
2021-05-14 01:55:37,914 INFO  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application FAILED:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$4(ApplicationDispatcherBootstrap.java:304) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680) ~[?:?]
        at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658) ~[?:?]
        at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094) ~[?:?]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.unwrapJobResultException(ApplicationDispatcherBootstrap.java:297) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$getApplicationResult$3(ApplicationDispatcherBootstrap.java:270) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) ~[?:?]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) ~[?:?]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.getApplicationResult(ApplicationDispatcherBootstrap.java:272) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:221) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:254) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.0.jar:1.13.0]
Caused by: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED
        at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:71) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        ... 29 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:60) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        ... 29 more
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$switchExecutor$24(FutureUtils.java:1377) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        ... 4 more
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
        at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:59) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
        at org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:986) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$25(AdaptiveScheduler.java:976) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
2021-05-14 01:55:37,916 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneApplicationClusterEntryPoint down with application status FAILED. Diagnostics null.
2021-05-14 01:55:37,917 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint

ChangZhuo Chen (陳昌倬)
On Mon, May 17, 2021 at 01:22:16PM +0200, Arvid Heise wrote:

> Hi ChangZhuo,
>
> This looks indeed like a bug. I created FLINK-22686 [1] to track it. It
> looks unrelated to reactive mode to me and more related to unaligned
> checkpoints. So, you can try out reactive mode with aligned checkpoints.
>
> If you can provide us with the topology, we can also fix it soonish: 1.13.1
> is on the horizon because of a license issue.
>
> [1] https://issues.apache.org/jira/browse/FLINK-22686
Hi Arvid,

We have upload topology to [0], please help to see if that is enough for
debugging, thanks.


[0] https://issues.apache.org/jira/browse/FLINK-22686


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment