Hi all, I have some problems when running my application on more than one Task Manager. setup: node1: Job Manager, Task Manager node2: Task Manager I can run my program successfully on each node alone when I stop the other Task Manager. But when I start both and set parallelism = 2, every time I got the following exception (after 30 seconds): org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 6372b5f434d55e987ea179d6f6b488fe@e389ca50a2c2cf776b90268f987a6546 not found. at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:273) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:182) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:400) at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1294) at org.apache.flink.runtime.taskmanager.Task.lambda$triggerPartitionProducerStateCheck$0(Task.java:1151) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) It seems the problem occurs when a subtask is linked to both Task Manager. Does anybody know how I can make it work? Thanks, Steffen |
Hi, Steffen You can take a look on this https://github.com/apache/flink/pull/6103. Hopes can help! Cheers Minglei
|
In reply to this post by Steffen Wohlers
Hi Steffen, This exception indicates that when the downstream task requests partition from the upstream task, the upstream task has not initialized to register its result partition. In this case, the downstream task will inquire the state from job manager, and then retry to request partition from the upstream until the maximum retry timeout. You can increase the parameter of "taskmanager.network.request-backoff.max" to check whether it works, the default value is 10s. BTW, you should check why the upstream registers its result partition delayed, maybe the upstream TaskManager received the task deployment delayed from JobManager, or some operations in upstream task initialization unexpectly cost more time before registering result partition. Best, Zhijiang
|
Hi Zhijiang, Minglei, all,
your both hints and explanations work well. Thank you very much! Thanks, Steffen
|
Free forum by Nabble | Edit this page |