Network PartitionNotFoundException when run on multi nodes

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Network PartitionNotFoundException when run on multi nodes

Steffen Wohlers
Hi all,

I have some problems when running my application on more than one Task Manager.

setup:
node1: Job Manager, Task Manager
node2: Task Manager

I can run my program successfully on each node alone when I stop the other Task Manager.
But when I start both and set parallelism = 2, every time I got the following exception (after 30 seconds):

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 6372b5f434d55e987ea179d6f6b488fe@e389ca50a2c2cf776b90268f987a6546 not found.
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:273)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:182)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:400)
	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1294)
	at org.apache.flink.runtime.taskmanager.Task.lambda$triggerPartitionProducerStateCheck$0(Task.java:1151)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



It seems the problem occurs when a subtask is linked to both Task Manager.

Does anybody know how I can make it work?

Thanks,
Steffen

Reply | Threaded
Open this post in threaded view
|

Re: Network PartitionNotFoundException when run on multi nodes

zhangminglei
Hi, Steffen

You can take a look on this https://github.com/apache/flink/pull/6103. Hopes can help!

Cheers
Minglei

在 2018年7月22日,下午10:22,Steffen Wohlers <[hidden email]> 写道:

Hi all,

I have some problems when running my application on more than one Task Manager.

setup:
node1: Job Manager, Task Manager
node2: Task Manager

I can run my program successfully on each node alone when I stop the other Task Manager.
But when I start both and set parallelism = 2, every time I got the following exception (after 30 seconds):

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 6372b5f434d55e987ea179d6f6b488fe@e389ca50a2c2cf776b90268f987a6546 not found.
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:273)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:182)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:400)
	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1294)
	at org.apache.flink.runtime.taskmanager.Task.lambda$triggerPartitionProducerStateCheck$0(Task.java:1151)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



It seems the problem occurs when a subtask is linked to both Task Manager.

Does anybody know how I can make it work?

Thanks,
Steffen


Reply | Threaded
Open this post in threaded view
|

回复:Network PartitionNotFoundException when run on multi nodes

Zhijiang(wangzhijiang999)
In reply to this post by Steffen Wohlers
Hi Steffen,

This exception indicates that when the downstream task requests partition from the upstream task, the upstream task has not initialized to register its result partition.
In this case, the downstream task will inquire the state from job manager, and then retry to request partition from the upstream until the maximum retry timeout. You can increase the parameter of "taskmanager.network.request-backoff.max" to check whether it works, the default value is 10s.

BTW, you should check why the upstream registers its result partition delayed, maybe the upstream TaskManager received the task deployment delayed from JobManager, or some operations in upstream task initialization unexpectly cost more time before registering result partition. 

Best,
Zhijiang
------------------------------------------------------------------
发件人:Steffen Wohlers <[hidden email]>
发送时间:2018年7月22日(星期日) 22:22
收件人:user <[hidden email]>
主 题:Network PartitionNotFoundException when run on multi nodes

Hi all,

I have some problems when running my application on more than one Task Manager.

setup:
node1: Job Manager, Task Manager
node2: Task Manager

I can run my program successfully on each node alone when I stop the other Task Manager.
But when I start both and set parallelism = 2, every time I got the following exception (after 30 seconds):

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 6372b5f434d55e987ea179d6f6b488fe@e389ca50a2c2cf776b90268f987a6546 not found.
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:273)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:182)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:400)
	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1294)
	at org.apache.flink.runtime.taskmanager.Task.lambda$triggerPartitionProducerStateCheck$0(Task.java:1151)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



It seems the problem occurs when a subtask is linked to both Task Manager.

Does anybody know how I can make it work?

Thanks,
Steffen


Reply | Threaded
Open this post in threaded view
|

Re: Network PartitionNotFoundException when run on multi nodes

Steffen Wohlers
Hi Zhijiang, Minglei, all,

your both hints and explanations work well. Thank you very much!

Thanks,
Steffen

On 23. Jul 2018, at 08:10, Zhijiang(wangzhijiang999) <[hidden email]> wrote:

Hi Steffen,

This exception indicates that when the downstream task requests partition from the upstream task, the upstream task has not initialized to register its result partition.
In this case, the downstream task will inquire the state from job manager, and then retry to request partition from the upstream until the maximum retry timeout. You can increase the parameter of "taskmanager.network.request-backoff.max" to check whether it works, the default value is 10s.

BTW, you should check why the upstream registers its result partition delayed, maybe the upstream TaskManager received the task deployment delayed from JobManager, or some operations in upstream task initialization unexpectly cost more time before registering result partition. 

Best,
Zhijiang
------------------------------------------------------------------
发件人:Steffen Wohlers <[hidden email]>
发送时间:2018年7月22日(星期日) 22:22
收件人:user <[hidden email]>
主 题:Network PartitionNotFoundException when run on multi nodes

Hi all,

I have some problems when running my application on more than one Task Manager.

setup:
node1: Job Manager, Task Manager
node2: Task Manager

I can run my program successfully on each node alone when I stop the other Task Manager.
But when I start both and set parallelism = 2, every time I got the following exception (after 30 seconds):

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 6372b5f434d55e987ea179d6f6b488fe@e389ca50a2c2cf776b90268f987a6546 not found.
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:273)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:182)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:400)
	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1294)
	at org.apache.flink.runtime.taskmanager.Task.lambda$triggerPartitionProducerStateCheck$0(Task.java:1151)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



It seems the problem occurs when a subtask is linked to both Task Manager.

Does anybody know how I can make it work?

Thanks,
Steffen