Hi everyone,
I am running some Flink experiments with Peel benchmark http://peel-framework.org/ and I am struggling with exceptions: the environment is a 25-nodes cluster, 16 cores per nodes. The dataset is ~80GiB and is located on Hdfs 2.7.1. Flink version is 1.0.3. At the beginning I tried with 400 as degree of parallelism but not enough numberOfBuffers was raised so I changed the parallelism to 200. Flink configuration follows: jobmanager.rpc.address = ${runtime.hostname} akka.log.lifecycle.events = ON akka.ask.timeout = 300s jobmanager.rpc.port = 6002 jobmanager.heap.mb = 1024 jobmanager.web.port = 6004 taskmanager.heap.mb = 28672 taskmanager.memory.fraction = 0.7 taskmanager.network.numberOfBuffers = 32768 taskmanager.network.bufferSizeInBytes = 16384 taskmanager.tmp.dirs = "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp" taskmanager.debug.memory.startLogThread = true With a parallelism of 200 the following exception will raise from a node of the cluster: 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1) switched to FAILED with exception. java.lang.IllegalStateException: Received unexpected partition state null for partition request. This is a bug. at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994) 43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) The map code is: 69 dimensionDS.map { 70 dimension => 71 val values = DenseVector(Array.fill(dimension)(0.0)) 72 values 73 } 74 } I can't figure out a solution, thank you for your help. Andrea -- Andrea Spina N.Tessera: 74598 MAT: 89369 Ingegneria Informatica [LM] (D.M. 270) |
Hey Andrea! Sorry for the bad user experience.
Regarding the network buffers: you should be able to run it after increasing the number of network buffers, just account for it when specifying the heap size etc. You currently allocate 32768 * 16384 bytes = 512 MB for them. If you have a very long pipeline and high parallelism, you should increase it accordingly. How much memory do you have on your machines? Regarding the IllegalStateException: I suspect that this is **not** the root failure cause. The null ExecutionState can only happen, if the producer task (from which data is requested) failed during the request. The error message is confusing and I opened a JIRA to fix it: https://issues.apache.org/jira/browse/FLINK-4131. Can you please check your complete logs to see what the root cause might be, e.g. why did the producer fail? On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA <[hidden email]> wrote: > Hi everyone, > > I am running some Flink experiments with Peel benchmark > http://peel-framework.org/ and I am struggling with exceptions: the > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is ~80GiB > and is located on Hdfs 2.7.1. Flink version is 1.0.3. > > At the beginning I tried with 400 as degree of parallelism but not enough > numberOfBuffers was raised so I changed the parallelism to 200. Flink > configuration follows: > > jobmanager.rpc.address = ${runtime.hostname} > akka.log.lifecycle.events = ON > akka.ask.timeout = 300s > jobmanager.rpc.port = 6002 > jobmanager.heap.mb = 1024 > jobmanager.web.port = 6004 > taskmanager.heap.mb = 28672 > taskmanager.memory.fraction = 0.7 > taskmanager.network.numberOfBuffers = 32768 > taskmanager.network.bufferSizeInBytes = 16384 > taskmanager.tmp.dirs = > "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp" > taskmanager.debug.memory.startLogThread = true > > With a parallelism of 200 the following exception will raise from a node of > the cluster: > > 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli > nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1) > switched to FAILED with exception. > java.lang.IllegalStateException: Received unexpected partition state null > for partition request. This is a bug. > at > org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994) > > > The reduce code is: > > 43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) > > The map code is: > > 68 def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector] = { > 69 dimensionDS.map { > 70 dimension => > 71 val values = DenseVector(Array.fill(dimension)(0.0)) > 72 values > 73 } > 74 } > > I can't figure out a solution, thank you for your help. > > Andrea > > -- > Andrea Spina > N.Tessera: 74598 > MAT: 89369 > Ingegneria Informatica [LM] (D.M. 270) |
Hi Ufuk,
so the memory available per node is 48294 megabytes per node, but I reserve 28 by flink conf file. taskmanager.heap.mb = 28672 taskmanager.memory.fraction = 0.7 taskmanager.network.numberOfBuffers = 32768 taskmanager.network.bufferSizeInBytes = 16384 Anyway Follows what I found in log files. Follows the taskmanager log (task manager that seems failed) 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1) switched to FAILED with exception. java.lang.IllegalStateException: Received unexpected partition state null for partition request. This is a bug. at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994) at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala: 468) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Follows the jobmanager log 2016-06-29 11:31:34,683 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) switched from DEPLOYING to FAILED 2016-06-29 11:31:34,694 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job 71542654d427e8d0e7e01c538abe1acf (peel -bundle-flink) changed to FAILING. java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV ector(sGradientDescentL2.scala:43)) -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of 10000 milliseconds at org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387) at akka.dispatch.OnComplete.internal(Future.scala:246) at akka.dispatch.OnComplete.internal(Future.scala:244) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]] after [1000 0 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) Follows the client-{$runtime.hostname}.log 2016-06-29 11:31:34,687 INFO org.apache.flink.runtime.client.JobClientActor - 06/29/2016 11:31:34 CHAIN Reduce (Reduce at di ma.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) -> Map (Map at dima.tu.berlin. benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))(1/1) switched to FAILED java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV ector(sGradientDescentL2.scala:43)) -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of 10000 milliseconds at org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387) at akka.dispatch.OnComplete.internal(Future.scala:246) at akka.dispatch.OnComplete.internal(Future.scala:244) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]] after [1000 0 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) 2016-06-29 11:31:34,709 INFO org.apache.flink.runtime.client.JobClientActor - 06/29/2016 11:31:34 Job execution switched to status FAILING. java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV ector(sGradientDescentL2.scala:43)) -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of 10000 milliseconds at org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387) at akka.dispatch.OnComplete.internal(Future.scala:246) at akka.dispatch.OnComplete.internal(Future.scala:244) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) Really appreciating your help here. :) Cheers, Andrea 2016-06-29 13:48 GMT+02:00 Ufuk Celebi <[hidden email]>: Hey Andrea! Sorry for the bad user experience. Andrea Spina N.Tessera: 74598 MAT: 89369 Ingegneria Informatica [LM] (D.M. 270) |
OK, looks like you can easily give more memory to the network stack,
e.g. for 2 GB set taskmanager.network.numberOfBuffers = 65536 taskmanager.network.bufferSizeInBytes = 32768 For the other exception, your logs confirm that there is something else going on. Try increasing the akka ask timeout: akka.ask.timeout: 100 s Does this help? On Wed, Jun 29, 2016 at 3:10 PM, ANDREA SPINA <[hidden email]> wrote: > Hi Ufuk, > > so the memory available per node is 48294 megabytes per node, but I reserve > 28 by flink conf file. > taskmanager.heap.mb = 28672 > taskmanager.memory.fraction = 0.7 > taskmanager.network.numberOfBuffers = 32768 > taskmanager.network.bufferSizeInBytes = 16384 > > Anyway Follows what I found in log files. > > Follows the taskmanager log (task manager that seems failed) > > 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli > nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1) > switched to FAILED with exception. > java.lang.IllegalStateException: Received unexpected partition state null > for partition request. This is a bug. > at > org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala: > 468) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > Follows the jobmanager log > > 2016-06-29 11:31:34,683 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Reduce > (Reduce at dima.tu.berlin.benchmark.fli > nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1) > (8c2d9a0a0520c2c18e07bad0a97a3911) switched from DEPLOYING to FAILED > 2016-06-29 11:31:34,694 INFO org.apache.flink.runtime.jobmanager.JobManager > - Status of job 71542654d427e8d0e7e01c538abe1acf (peel > -bundle-flink) changed to FAILING. > java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at > dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV > ector(sGradientDescentL2.scala:43)) -> Map (Map at > dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce > ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager > (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f > link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of > 10000 milliseconds > at > org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387) > at akka.dispatch.OnComplete.internal(Future.scala:246) > at akka.dispatch.OnComplete.internal(Future.scala:244) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]] > after [1000 > 0 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > at java.lang.Thread.run(Thread.java:745) > > > Follows the client-{$runtime.hostname}.log > > 2016-06-29 11:31:34,687 INFO org.apache.flink.runtime.client.JobClientActor > - 06/29/2016 11:31:34 CHAIN Reduce (Reduce at di > ma.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) > -> Map (Map at dima.tu.berlin. > benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))(1/1) > switched to FAILED > java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at > dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV > ector(sGradientDescentL2.scala:43)) -> Map (Map at > dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce > ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager > (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f > link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of > 10000 milliseconds > at > org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387) > at akka.dispatch.OnComplete.internal(Future.scala:246) > at akka.dispatch.OnComplete.internal(Future.scala:244) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]] > after [1000 > 0 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > at java.lang.Thread.run(Thread.java:745) > 2016-06-29 11:31:34,709 INFO org.apache.flink.runtime.client.JobClientActor > - 06/29/2016 11:31:34 Job execution switched to > status FAILING. > java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at > dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV > ector(sGradientDescentL2.scala:43)) -> Map (Map at > dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce > ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager > (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f > link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of > 10000 milliseconds > at > org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387) > at akka.dispatch.OnComplete.internal(Future.scala:246) > at akka.dispatch.OnComplete.internal(Future.scala:244) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]] > after [10000 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > at java.lang.Thread.run(Thread.java:745) > > Really appreciating your help here. :) > Cheers, > Andrea > > 2016-06-29 13:48 GMT+02:00 Ufuk Celebi <[hidden email]>: >> >> Hey Andrea! Sorry for the bad user experience. >> >> Regarding the network buffers: you should be able to run it after >> increasing the number of network buffers, just account for it when >> specifying the heap size etc. You currently allocate 32768 * 16384 >> bytes = 512 MB for them. If you have a very long pipeline and high >> parallelism, you should increase it accordingly. How much memory do >> you have on your machines? >> >> Regarding the IllegalStateException: I suspect that this is **not** >> the root failure cause. The null ExecutionState can only happen, if >> the producer task (from which data is requested) failed during the >> request. The error message is confusing and I opened a JIRA to fix it: >> https://issues.apache.org/jira/browse/FLINK-4131. Can you please check >> your complete logs to see what the root cause might be, e.g. why did >> the producer fail? >> >> >> On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA >> <[hidden email]> wrote: >> > Hi everyone, >> > >> > I am running some Flink experiments with Peel benchmark >> > http://peel-framework.org/ and I am struggling with exceptions: the >> > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is >> > ~80GiB >> > and is located on Hdfs 2.7.1. Flink version is 1.0.3. >> > >> > At the beginning I tried with 400 as degree of parallelism but not >> > enough >> > numberOfBuffers was raised so I changed the parallelism to 200. Flink >> > configuration follows: >> > >> > jobmanager.rpc.address = ${runtime.hostname} >> > akka.log.lifecycle.events = ON >> > akka.ask.timeout = 300s >> > jobmanager.rpc.port = 6002 >> > jobmanager.heap.mb = 1024 >> > jobmanager.web.port = 6004 >> > taskmanager.heap.mb = 28672 >> > taskmanager.memory.fraction = 0.7 >> > taskmanager.network.numberOfBuffers = 32768 >> > taskmanager.network.bufferSizeInBytes = 16384 >> > taskmanager.tmp.dirs = >> > >> > "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp" >> > taskmanager.debug.memory.startLogThread = true >> > >> > With a parallelism of 200 the following exception will raise from a node >> > of >> > the cluster: >> > >> > 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task >> > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli >> > >> > nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) >> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver >> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) >> > (1/1) >> > switched to FAILED with exception. >> > java.lang.IllegalStateException: Received unexpected partition state >> > null >> > for partition request. This is a bug. >> > at >> > >> > org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994) >> > >> > >> > The reduce code is: >> > >> > 43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) >> > >> > The map code is: >> > >> > 68 def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector] >> > = { >> > 69 dimensionDS.map { >> > 70 dimension => >> > 71 val values = DenseVector(Array.fill(dimension)(0.0)) >> > 72 values >> > 73 } >> > 74 } >> > >> > I can't figure out a solution, thank you for your help. >> > >> > Andrea >> > >> > -- >> > Andrea Spina >> > N.Tessera: 74598 >> > MAT: 89369 >> > Ingegneria Informatica [LM] (D.M. 270) > > > > > -- > Andrea Spina > N.Tessera: 74598 > MAT: 89369 > Ingegneria Informatica [LM] (D.M. 270) |
Other than increasing the ask.timeout, we've seen such failures being caused by long GC pauses over bigger heaps. In such a case, you could fiddle with a) enabling object reuse, or b) enabling off-heap memory (i.e. taskmanager.memory.off-heap == true) to mitigate GC-induced issues a bit. Hope it helps,On Wed, Jun 29, 2016 at 3:29 PM Ufuk Celebi <[hidden email]> wrote: OK, looks like you can easily give more memory to the network stack, |
Hi everybody, increasing the akka.ask.timeout solved the second issue. Anyway that was a warning about a congestioned network. So I worked to improve the algorithm. Increasing the numberOfBuffers and the corresponding size solved the first issue, thus now I can run with the full DOP. In my case enabling the off-heap memory didn't the trick. Thank you. All the bests, Andrea 2016-06-29 17:10 GMT+02:00 Martin Scholl <[hidden email]>:
Andrea Spina N.Tessera: 74598 MAT: 89369 Ingegneria Informatica [LM] (D.M. 270) |
Free forum by Nabble | Edit this page |