Execution Failed (cluster setup Flink+Hadoop), Task Manager was lost/killed

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Execution Failed (cluster setup Flink+Hadoop), Task Manager was lost/killed

Oleksandra Levchenko
Hi, 

I am running Flink batch job on Standalone Cluster (16 nodes), on top of Hadoop. 
The chain looks like:

DataSet1 = env.readTextFile (csv on hdfs)
.map
.flatMap
.groupBy
.reduce
.map
.writeAsCsv (DataSet 1)

DataSet2 = env.readTextFile 
.map
.flatMap

env.readCsvFile (DataSet1)
DataSet1.flatJoin(DataSet2)
.groupBy
.reduce
.filter
.count

The job finishes successfully with DataSource ~ 24.3 G 
As I scaled to a larger data  (244.3 G ) it fails   with: 

java.lang.Exception: TaskManager was lost/killed: 3c0d5310e30f7c52eae95ff97bee85e2 @ grisou-46.nancy.grid5000.fr (dataPort=51359)
at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
at akka.actor.ActorCell.invoke(ActorCell.scala:486)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Next job submit  after cluster restart produce this ERROR, I guess caused by TaskManager lost:
 


java.lang.IllegalStateException: Update task on TaskManager 12c673eb2681eb4f1d1cb000e561f1c5 @ grisou-48.nancy.grid5000.fr (dataPort=42326) failed due to: at org.apache.flink.runtime.executiongraph.Execution$8.apply(Execution.java:1076) at org.apache.flink.runtime.executiongraph.Execution$8.apply(Execution.java:1073) at org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201) at akka.dispatch.Recover.internal(Future.scala:268) at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184) at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216) at scala.util.Try$.apply(Try.scala:192) at scala.util.Failure.recover(Try.scala:216) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[<a href="akka.tcp://flink@grisou-48.nancy.grid5000.fr:49630/user/taskmanager#-1925734821" class="">akka.tcp://flink@...:49630/user/taskmanager#-1925734821]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) ... 1 more 


Regarding web-monitor there are 0 finished tasks  after DataSource -> Map -> Map -> Map - > FlatMap ->Reduce Chain .  
Also looks like lost/killed Task Manager is not restarting.  

Thank you for any help  on this issue, 

Regards  
Oleksandra