Hi, I have written ETL jobs in Flink (DataSet API). When I execute them in IDE, they run and finish fine. When I try to run them on my cluster, I get "Insufficient number of network buffers" error. Thanks, Tarandeep Exception I got after I tried to run with 7500 buffers:Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalStateException: Update task on instance d4f3f517b33e5fa8a9932fc06a0aef3b @ dev-cluster-slave1 - 4 slots - URL: akka.tcp://flink@172.22.13.39:52046/user/taskmanager failed due to: at org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954) at akka.dispatch.OnFailure.internal(Future.scala:228) at akka.dispatch.OnFailure.internal(Future.scala:227) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ... 2 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@172.22.13.39:52046/user/taskmanager#-1857397999]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) |
Hey Tarandeep,
I think the failures are unrelated. Regarding the number of network buffers: https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-the-network-buffers The timeouts might occur, because the task managers are pretty loaded. I would suggest to increase the Akka ask timeouts via akka.ask.timeout: 100 s (https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#distributed-coordination-via-akka) – Ufuk On Tue, May 3, 2016 at 6:40 AM, Tarandeep Singh <[hidden email]> wrote: > Hi, > > I have written ETL jobs in Flink (DataSet API). When I execute them in IDE, > they run and finish fine. When I try to run them on my cluster, I get > "Insufficient number of network buffers" error. > > I have 5 machines in my cluster with 4 cores each. TaskManager is given 3GB > each. I increased the number of buffers to 5000, but got the same error. > When I increased it further (say 7500), I get exception listed below. > > The DAG or execution plan is pretty big. What is recommended way to run your > jobs when the DAG becomes huge? Shall I break it into parts by calling > execute on execution environment in between jobs ? > > Thanks, > Tarandeep > > Exception I got after I tried to run with 7500 buffers: > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: Update task on instance > d4f3f517b33e5fa8a9932fc06a0aef3b @ dev-cluster-slave1 - 4 slots - URL: > akka.tcp://flink@172.22.13.39:52046/user/taskmanager failed due to: > at > org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954) > at akka.dispatch.OnFailure.internal(Future.scala:228) > at akka.dispatch.OnFailure.internal(Future.scala:227) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) > at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) > at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > ... 2 more > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka.tcp://flink@172.22.13.39:52046/user/taskmanager#-1857397999]] > after [10000 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > at java.lang.Thread.run(Thread.java:745) > > > > > > > > |
Yes, you are right, the exception was caused as task managers were heavily loaded. I checked ganglia metrics and CPU usage was very high. I reduced parallelism and ran with 5000 buffers and didn't get any exception. Thanks,On Tue, May 3, 2016 at 2:19 AM, Ufuk Celebi <[hidden email]> wrote: Hey Tarandeep, |
Free forum by Nabble | Edit this page |