Hi,
I'm running a Flink batch job that reads almost 1 TB of data from S3 and then performs operations on it. A list of filenames are distributed among the TM's and each subset of files is read from S3 from each TM. This job errors out at the read step due to the following error: java.lang.Exception: TaskManager was lost/killed Having read similar questions on the mailing list, it seems like this is a memory issue, with full GC at the TM causing the TM to be lost. After enabling memory debugging this seems to be the stats just before erroring out: Memory usage stats: [HEAP: 8327/18704/18704 MB, NON HEAP: 79/81/-1 MB (used/committed/max)] Direct memory stats: Count: 5236, Total Capacity: 17148907, Used Memory: 17148908 Off-heap pool stats: [Code Cache: 25/27/240 MB (used/committed/max)], [Metaspace: 47/48/-1 MB (used/committed/max)], [Compressed Class Space: 5/5/1024 MB (used/committed/max)] Garbage collector stats: [G1 Young Generation, GC TIME (ms): 16712, GC COUNT: 290], [G1 Old Generation, GC TIME (ms): 689, GC COUNT: 2] I tried all of these suggested fixes: decreased taskmanager.memory.fraction to give more memory to user managed operations, increased number of JVM's(parallelism), used the G1 GC for better GC performance, but my job still errors out. I increased akka.watch.heartbeat.pause, akka.watch.threshold, akka.watch.heartbeat.interval to prevent the timeout due to GC. But this doesn't help either. I figured with the really high values for death watch, the program would run really slowly and complete at some point but it fails anyway. I'm now trying to decrease object creation in my program, but so far it hasn't helped. How can I go about debugging and fixing this problem? Thank you. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Did you try running your pipeline by setting RocksDB State Backend ? Are you managing state in pipeline or using windowing ? Direct memory stats: Count: 5236, Total Capacity: 17148907, Used Memory: 17148908 From the above stats it seems you are running out of memory because of which TM got killed. I have experienced a similar issue of TM getting frequently killed or the job is not progressing fast because of full GC's. Moving to RocksDB solved the issue Regards, Vinay Patil On Tue, Sep 5, 2017 at 6:39 PM, ShB [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hi, |
In reply to this post by ShB
Late response, but a common reason for disappearing TaskManagers is termination by the Linux out-of-memory killer, with the recommendation to decrease the allotted memory.
> On Sep 5, 2017, at 9:09 AM, ShB <[hidden email]> wrote: > > Hi, > > I'm running a Flink batch job that reads almost 1 TB of data from S3 and > then performs operations on it. A list of filenames are distributed among > the TM's and each subset of files is read from S3 from each TM. This job > errors out at the read step due to the following error: > java.lang.Exception: TaskManager was lost/killed > > Having read similar questions on the mailing list, it seems like this is a > memory issue, with full GC at the TM causing the TM to be lost. > > After enabling memory debugging this seems to be the stats just before > erroring out: > Memory usage stats: [HEAP: 8327/18704/18704 MB, NON HEAP: 79/81/-1 MB > (used/committed/max)] > Direct memory stats: Count: 5236, Total Capacity: 17148907, Used Memory: > 17148908 > Off-heap pool stats: [Code Cache: 25/27/240 MB (used/committed/max)], > [Metaspace: 47/48/-1 MB (used/committed/max)], [Compressed Class Space: > 5/5/1024 MB (used/committed/max)] > Garbage collector stats: [G1 Young Generation, GC TIME (ms): 16712, GC > COUNT: 290], [G1 Old Generation, GC TIME (ms): 689, GC COUNT: 2] > > I tried all of these suggested fixes: decreased taskmanager.memory.fraction > to give more memory to user managed operations, increased number of > JVM's(parallelism), used the G1 GC for better GC performance, but my job > still errors out. > > I increased akka.watch.heartbeat.pause, akka.watch.threshold, > akka.watch.heartbeat.interval to prevent the timeout due to GC. But this > doesn't help either. I figured with the really high values for death watch, > the program would run really slowly and complete at some point but it fails > anyway. > > I'm now trying to decrease object creation in my program, but so far it > hasn't helped. > > How can I go about debugging and fixing this problem? > > Thank you. > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks for your response!
Recommendation to decrease allotted memory? Which allotted memory should be decreased? I tried decreasing taskmanager.memory.fraction to give more memory to user managed operations, that doesn't work beyond a point. Also tried increasing containerized.heap-cutoff-ratio, that didn't work either. What eventually solved the problem was increasing parallelism - throwing in many more task managers. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi!
The garbage collection stats actually look okay, not terribly bad - almost surprised that this seems to cause failures. Can you check whether you find messages in the TM / JM log about heartbeat timeouts, actor systems being "gated" or "quarantined"? Would also be interesting to know how the program is actually set up - where does data of the files you read go? Do you just keep them as objects in lists, or do you emit them from the operators? Best, Stephan On Wed, Sep 20, 2017 at 1:58 AM, ShB <[hidden email]> wrote: Thanks for your response! |
Hi Stephan,
Thanks for your response! Task manager lost/killed has been a recurring problem I've had with Flink for the last few months, as I try to scale to larger and larger amounts of data. I would be very grateful for some help figuring out how I can avoid this. The program is set up something like this: / DataSet<CustomType> data = env.fromCollection(listOfFiles) .rebalance() .flatMap(new ReadFiles()) .filter(new FilterData()); DataSet<Tuple8> computation1 = data .map(new Compute1()) .distinct() .map(new Compute2()) .groupBy(0, 1, 2) .aggregate(SUM, 3).and(SUM, 4).and(SUM, 5); Dataset<Tuple10> computation2 = data .map(new Compute3()) .distinct() .map(new Compute4()) .groupBy(0, 1, 2) .aggregate(SUM, 3).and(SUM, 4).and(SUM, 5); Dataset<Tuple12> finalOP = computation1.join(computation2) .where(0, 1) .equalTo(0, 1) .with(new Join1()) .sortPartition(0, Order.ASCENDING) .setParallelism(1); finalOP.writeAsCsv("s3://myBucket/myKey.csv"); --- public static final class ReadFiles implements FlatMapFunction<String, CustomType> { @Override public void flatMap(String fileName, Collector<CustomType> out) throws Exception { S3FileReaderAndParser parser = new S3FileReaderAndParser(fileName); List<CustomType> dataList = parser.parseFiles(); for (CustomType data : dataList) { out.collect(data); } } } / Task Manager is killed/lost during the ReadFiles() flatmap. ReadFiles is a flatmap function that reads each of the files from S3 using the AWS S3 Java SDK and parses and emits each of the protobufs. And yes, I can find a message like this in the logs about "gated" systems: 2017-10-12 20:46:00,355 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@ip-172-31-8-29:38763] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@ip-172-31-8-29:38763]] Caused by: [Connection refused: ip-172-31-8-29/172.31.8.29:38763] Thank you! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Stephan,
Apologies, I hit send too soon on the last email. So, while trying to debug this, I ran it multiple times on different instance types(to increase RAM available) and while digging into the logs, I found this to be the error in the task manager logs: / java.lang.RuntimeException: Emitting the record caused an I/O exception: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@12b3c49e at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:104) at org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:89) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:90) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@12b3c49e at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:36) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:26) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:111) at org.apache.flink.runtime.io.network.partition.ResultPartition.add(ResultPartition.java:278) at org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter.writeBuffer(ResultPartitionWriter.java:72) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.writeAndClearBuffer(RecordWriter.java:223) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:121) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) ... 13 more / Any idea on a fix for this issue? I can't seem to find any further information on this in the mailing lists. Thank you. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
On further investigation, seems to me the I/O exception I posted previously
is not the cause of the TM being lost. it's the after effect of the TM being shut down and the channel being closed after a record is emitted but before it's processed. Previously, the logs didn't throw up this error and I'm also unable to reproduce it each time(I've come across the I/O exception twice so far). Most of the time, the logs don't have the I/O or any other exception/error messages. This is what the logs usually(without the I/O exception) look like: Job Manager: / 2017-10-12 22:22:41,857 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_1507845873691_0001_01_000008 failed. Exit status: -100 2017-10-12 22:22:41,858 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_1507845873691_0001_01_000008 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node 2017-10-12 22:22:41,858 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 1 2017-10-12 22:22:41,858 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 22000 megabytes memory. Pending requests: 1 2017-10-12 22:22:42,096 INFO org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@ip-172-31-43-115:43404/user/taskmanager terminated. 2017-10-12 22:22:42,210 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter at main(FindOutput.java:85)) (39/96) (530ca4789a921cab363f241176dac7a8) switched from RUNNING to FAILED. java.lang.Exception: TaskManager was lost/killed: container_1507845873691_0001_01_000008 @ ip-172-31-43-115.us-west-2.compute.internal (dataPort=40747) at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2017-10-12 22:22:42,451 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Output (0c45ba62b56fefd1c1e7bfd68923411d) switched from state RUNNING to FAILING. java.lang.Exception: TaskManager was lost/killed: container_1507845873691_0001_01_000008 @ ip-172-31-43-115.us-west-2.compute.internal (dataPort=40747) at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2017-10-12 22:22:42,907 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter at main(FindOutput.java:85)) (1/96) (8cf2869e9786809d1b9b9d12b9467e40) switched from RUNNING to CANCELING. / Task Manager: / 2017-10-12 22:22:38,570 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369], [G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2] 2017-10-12 22:22:38,631 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 5973/17600/17600 MB, NON HEAP: 72/73/-1 MB (used/committed/max)] 2017-10-12 22:22:38,631 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 3150, Total Capacity: 17138363, Used Memory: 17138364 2017-10-12 22:22:38,631 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 18/18/240 MB (used/committed/max)], [Metaspace: 47/48/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB (used/committed/max)] 2017-10-12 22:22:38,631 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369], [G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2] 2017-10-12 22:22:38,691 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 6101/17600/17600 MB, NON HEAP: 72/73/-1 MB (used/committed/max)] 2017-10-12 22:22:38,691 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 3234, Total Capacity: 17139035, Used Memory: 17139036 2017-10-12 22:22:38,691 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 18/18/240 MB (used/committed/max)], [Metaspace: 47/48/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB (used/committed/max)] 2017-10-12 22:22:38,691 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369], [G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2] *2017-10-12 22:22:38,709 INFO org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.* 2017-10-12 22:22:38,713 INFO org.apache.flink.runtime.blob.BlobCache - Shutting down BlobCache 2017-10-12 22:22:38,719 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory /mnt/yarn/usercache/hadoop/appcache/application_1507845873691_0001/flink-io-a5aace05-73ed-4cea-ad07-db86f9f8ce21 2017-10-12 22:22:38,719 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory /mnt1/yarn/usercache/hadoop/appcache/application_1507845873691_0001/flink-io-cb34ffbe-879f-47d4-9df3-6ed2b0dcd799 / This is what the logs sometimes(with the I/O exception) look like: Job Manager: / 2017-10-12 19:40:37,669 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@ip-172-31-11-129:43340] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 2017-10-12 19:40:37,922 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_1507836035753_0001_01_000015 failed. Exit status: -100 2017-10-12 19:40:37,922 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_1507836035753_0001_01_000015 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node 2017-10-12 19:40:37,922 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 1 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_1507836035753_0001_01_000013 failed. Exit status: -100 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_1507836035753_0001_01_000013 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 2 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_1507836035753_0001_01_000002 failed. Exit status: -100 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_1507836035753_0001_01_000002 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 3 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_1507836035753_0001_01_000003 failed. Exit status: -100 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_1507836035753_0001_01_000003 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 4 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 22000 megabytes memory. Pending requests: 1 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 22000 megabytes memory. Pending requests: 2 2017-10-12 19:40:37,924 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 22000 megabytes memory. Pending requests: 3 2017-10-12 19:40:37,924 INFO org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@ip-172-31-1-178:33620/user/taskmanager terminated. 2017-10-12 19:40:37,924 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 22000 megabytes memory. Pending requests: 4 2017-10-12 19:40:37,925 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter at main(FindOutput.java:87)) (40/136) (748d815623ff13e6357f351d5aa7b0f4) switched from RUNNING to FAILED. java.lang.Exception: TaskManager was lost/killed: container_1507836035753_0001_01_000015 @ ip-172-31-1-178.us-west-2.compute.internal (dataPort=35861) at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2017-10-12 19:40:37,931 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Output (11771c44eace0a1e32de1c3ca1c60b09) switched from state RUNNING to FAILING. java.lang.Exception: TaskManager was lost/killed: container_1507836035753_0001_01_000015 @ ip-172-31-1-178.us-west-2.compute.internal (dataPort=35861) at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) / Task Manager: / 2017-10-12 19:40:34,959 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 1347/17600/17600 MB, NON HEAP: 73/74/-1 MB (used/committed/max)] 2017-10-12 19:40:34,959 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 162, Total Capacity: 17111387, Used Memory: 17111388 2017-10-12 19:40:34,959 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 19/19/240 MB (used/committed/max)], [Metaspace: 48/49/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB (used/committed/max)] 2017-10-12 19:40:34,959 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 8020, GC COUNT: 363], [G1 Old Generation, GC TIME (ms): 695, GC COUNT: 2] 2017-10-12 19:40:35,019 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 1467/17600/17600 MB, NON HEAP: 73/74/-1 MB (used/committed/max)] 2017-10-12 19:40:35,019 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 196, Total Capacity: 17111659, Used Memory: 17111660 2017-10-12 19:40:35,019 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 19/19/240 MB (used/committed/max)], [Metaspace: 48/49/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB (used/committed/max)] 2017-10-12 19:40:35,019 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 8020, GC COUNT: 363], [G1 Old Generation, GC TIME (ms): 695, GC COUNT: 2] *2017-10-12 19:40:35,033 INFO org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.* 2017-10-12 19:40:35,043 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: CHAIN Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter at main(FindOutput.java:87)) (86/136) java.lang.RuntimeException: Emitting the record caused an I/O exception: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@21d84696 at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:104) at org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:89) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:90) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@21d84696 at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:36) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:26) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:111) at org.apache.flink.runtime.io.network.partition.ResultPartition.add(ResultPartition.java:278) at org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter.writeBuffer(ResultPartitionWriter.java:72) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.writeAndClearBuffer(RecordWriter.java:223) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:121) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) ... 13 more 2017-10-12 19:40:35,050 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory /mnt/yarn/usercache/hadoop/appcache/application_1507836035753_0001/flink-io-81d98a3a-7a40-438f-93fa-3b1f9dfc1e1d / I still can't figure out why the TM shuts down and how to avoid this at all - seems like a memory/GC issue. I was able to have the job complete previously by increasing parallelism(number of task managers). But as my dataset size has increases, I'm running into this issue again and increasing parallelism is not working. Any help would be greatly appreciated! Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by ShB
I just wanted to leave an update about this issue, for someone else who might
come across it. The problem was with memory, but it was disk memory and not heap/off-heap memory. Yarn was killing off my containers as they exceeded the threshold for disk utilization and this was manifesting as Task manager was lost/killed or JobClientActorConnectionTimeoutException: Lost connection to the JobManager. Digging deep into the individual instance node manager logs provided some hints about it being a disk issue. Some fixes for this problem: yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage -- can be increased to alleviate the problem temporarily. Increasing the disk capacity on each task manager is a more long-term fix. Increasing the number of task managers increases available disk memory and hence is also a fix. Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks for the heads-up and explaining how you resolve the issue! Best, Fabian2017-10-18 3:50 GMT+02:00 ShB <[hidden email]>: I just wanted to leave an update about this issue, for someone else who might |
Free forum by Nabble | Edit this page |