issue running flink in docker

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

issue running flink in docker

David Brelloch
Hi everyone,

We are attempting to run flink 1.2 in a distributed dockerized environment and are running into issues when running jobs in parallel.

The exception we are getting fairly quickly after start up is:
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition d3d8404aa26bedafd77e88bdfd88375b@84037703da6706cd1017f53fd8b818cd not found.
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:204)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:129)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:331)
	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1244)
	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1082)
	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1077)
	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:259)
	at akka.dispatch.OnComplete.internal(Future.scala:248)
	at akka.dispatch.OnComplete.internal(Future.scala:245)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
This only occurs when running in parallel but I don't have a lot to go on from the exception. We have configured the following ports:
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.data.port: 6121

And have mapped the docker ports 6121 and 6122 on the task managers as well as 6123 on the job manager.

Does anyone have any suggestions for other places to look or settings to try?

Thanks,
David



Reply | Threaded
Open this post in threaded view
|

Re: issue running flink in docker

Stephan Ewen
Hi!

Can it be that some hostname / IP address mapping / etc gets thrown off somewhere in the process?

This exception looks like the following happens:

  - JobManager gets a message from a TaskManager that a partition is ready, notifies other TaskManagers
  - TaskManager gets the update message, connects to the address of the indicated TaskManager
  - That taskmanager does not have that partition

Is it possible that JobManager / TaskManager see different names / addresses?

Also, is that Flink 1.2, DataSet job?

Stephan 



On Wed, May 10, 2017 at 7:05 PM, David Brelloch <[hidden email]> wrote:
Hi everyone,

We are attempting to run flink 1.2 in a distributed dockerized environment and are running into issues when running jobs in parallel.

The exception we are getting fairly quickly after start up is:
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition d3d8404aa26bedafd77e88bdfd88375b@84037703da6706cd1017f53fd8b818cd not found.
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:204)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:129)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:331)
	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1244)
	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1082)
	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1077)
	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:259)
	at akka.dispatch.OnComplete.internal(Future.scala:248)
	at akka.dispatch.OnComplete.internal(Future.scala:245)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
This only occurs when running in parallel but I don't have a lot to go on from the exception. We have configured the following ports:
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.data.port: 6121

And have mapped the docker ports 6121 and 6122 on the task managers as well as 6123 on the job manager.

Does anyone have any suggestions for other places to look or settings to try?

Thanks,
David




Reply | Threaded
Open this post in threaded view
|

Re: issue running flink in docker

David Brelloch
Stephan,

Thanks for pointing us in the right direction on the different addresses. That was the issue.

David

On Wed, May 10, 2017 at 3:03 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

Can it be that some hostname / IP address mapping / etc gets thrown off somewhere in the process?

This exception looks like the following happens:

  - JobManager gets a message from a TaskManager that a partition is ready, notifies other TaskManagers
  - TaskManager gets the update message, connects to the address of the indicated TaskManager
  - That taskmanager does not have that partition

Is it possible that JobManager / TaskManager see different names / addresses?

Also, is that Flink 1.2, DataSet job?

Stephan 



On Wed, May 10, 2017 at 7:05 PM, David Brelloch <[hidden email]> wrote:
Hi everyone,

We are attempting to run flink 1.2 in a distributed dockerized environment and are running into issues when running jobs in parallel.

The exception we are getting fairly quickly after start up is:
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition d3d8404aa26bedafd77e88bdfd88375b@84037703da6706cd1017f53fd8b818cd not found.
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:204)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:129)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:331)
	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1244)
	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1082)
	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1077)
	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:259)
	at akka.dispatch.OnComplete.internal(Future.scala:248)
	at akka.dispatch.OnComplete.internal(Future.scala:245)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
This only occurs when running in parallel but I don't have a lot to go on from the exception. We have configured the following ports:
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.data.port: 6121

And have mapped the docker ports 6121 and 6122 on the task managers as well as 6123 on the job manager.

Does anyone have any suggestions for other places to look or settings to try?

Thanks,
David





Reply | Threaded
Open this post in threaded view
|

Re: issue running flink in docker

Stephan Ewen
Glad to hear it!

Overlay networks (as most container infras use) are tricky and we need to add some code to make diagnostics of issues easier in those cases...

Stephan


On Wed, May 10, 2017 at 9:30 PM, David Brelloch <[hidden email]> wrote:
Stephan,

Thanks for pointing us in the right direction on the different addresses. That was the issue.

David

On Wed, May 10, 2017 at 3:03 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

Can it be that some hostname / IP address mapping / etc gets thrown off somewhere in the process?

This exception looks like the following happens:

  - JobManager gets a message from a TaskManager that a partition is ready, notifies other TaskManagers
  - TaskManager gets the update message, connects to the address of the indicated TaskManager
  - That taskmanager does not have that partition

Is it possible that JobManager / TaskManager see different names / addresses?

Also, is that Flink 1.2, DataSet job?

Stephan 



On Wed, May 10, 2017 at 7:05 PM, David Brelloch <[hidden email]> wrote:
Hi everyone,

We are attempting to run flink 1.2 in a distributed dockerized environment and are running into issues when running jobs in parallel.

The exception we are getting fairly quickly after start up is:
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition d3d8404aa26bedafd77e88bdfd88375b@84037703da6706cd1017f53fd8b818cd not found.
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:204)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:129)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:331)
	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1244)
	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1082)
	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1077)
	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:259)
	at akka.dispatch.OnComplete.internal(Future.scala:248)
	at akka.dispatch.OnComplete.internal(Future.scala:245)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
This only occurs when running in parallel but I don't have a lot to go on from the exception. We have configured the following ports:
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.data.port: 6121

And have mapped the docker ports 6121 and 6122 on the task managers as well as 6123 on the job manager.

Does anyone have any suggestions for other places to look or settings to try?

Thanks,
David