Task Manager was lost/killed due to full GC

classic Classic list List threaded Threaded
10 messages Options
ShB
Reply | Threaded
Open this post in threaded view
|

Task Manager was lost/killed due to full GC

ShB
Hi,

I'm running a Flink batch job that reads almost 1 TB of data from S3 and
then performs operations on it. A list of filenames are distributed among
the TM's and each subset of files is read from S3 from each TM. This job
errors out at the read step due to the following error:
java.lang.Exception: TaskManager was lost/killed

Having read similar questions on the mailing list, it seems like this is a
memory issue, with full GC at the TM causing the TM to be lost.

After enabling memory debugging this seems to be the stats just before
erroring out:
Memory usage stats: [HEAP: 8327/18704/18704 MB, NON HEAP: 79/81/-1 MB
(used/committed/max)]
Direct memory stats: Count: 5236, Total Capacity: 17148907, Used Memory:
17148908
Off-heap pool stats: [Code Cache: 25/27/240 MB (used/committed/max)],
[Metaspace: 47/48/-1 MB (used/committed/max)], [Compressed Class Space:
5/5/1024 MB (used/committed/max)]
Garbage collector stats: [G1 Young Generation, GC TIME (ms): 16712, GC
COUNT: 290], [G1 Old Generation, GC TIME (ms): 689, GC COUNT: 2]

I tried all of these suggested fixes: decreased taskmanager.memory.fraction
to give more memory to user managed operations, increased number of
JVM's(parallelism), used the G1 GC for better GC performance, but my job
still errors out.  

I increased akka.watch.heartbeat.pause, akka.watch.threshold,
akka.watch.heartbeat.interval to prevent the timeout due to GC. But this
doesn't help either. I figured with the really high values for death watch,
the program would run really slowly and complete at some point but it fails
anyway.

I'm now trying to decrease object creation in my program, but so far it
hasn't helped.

How can I go about debugging and fixing this problem?

Thank you.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Task Manager was lost/killed due to full GC

Vinay Patil
Hi,

Did you try running your pipeline by setting RocksDB State Backend ? Are you managing state in pipeline or using windowing ?

Direct memory stats: Count: 5236, Total Capacity: 17148907, Used Memory: 
17148908 


From the above stats it seems you are running out of memory because of which TM got killed.

I have experienced a similar issue of TM getting frequently killed or the job is not progressing fast because of full GC's. Moving to RocksDB solved the issue


Regards,
Vinay Patil

On Tue, Sep 5, 2017 at 6:39 PM, ShB [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi,

I'm running a Flink batch job that reads almost 1 TB of data from S3 and
then performs operations on it. A list of filenames are distributed among
the TM's and each subset of files is read from S3 from each TM. This job
errors out at the read step due to the following error:
java.lang.Exception: TaskManager was lost/killed

Having read similar questions on the mailing list, it seems like this is a
memory issue, with full GC at the TM causing the TM to be lost.

After enabling memory debugging this seems to be the stats just before
erroring out:
Memory usage stats: [HEAP: 8327/18704/18704 MB, NON HEAP: 79/81/-1 MB
(used/committed/max)]
Direct memory stats: Count: 5236, Total Capacity: 17148907, Used Memory:
17148908
Off-heap pool stats: [Code Cache: 25/27/240 MB (used/committed/max)],
[Metaspace: 47/48/-1 MB (used/committed/max)], [Compressed Class Space:
5/5/1024 MB (used/committed/max)]
Garbage collector stats: [G1 Young Generation, GC TIME (ms): 16712, GC
COUNT: 290], [G1 Old Generation, GC TIME (ms): 689, GC COUNT: 2]

I tried all of these suggested fixes: decreased taskmanager.memory.fraction
to give more memory to user managed operations, increased number of
JVM's(parallelism), used the G1 GC for better GC performance, but my job
still errors out.  

I increased akka.watch.heartbeat.pause, akka.watch.threshold,
akka.watch.heartbeat.interval to prevent the timeout due to GC. But this
doesn't help either. I figured with the really high values for death watch,
the program would run really slowly and complete at some point but it fails
anyway.

I'm now trying to decrease object creation in my program, but so far it
hasn't helped.

How can I go about debugging and fixing this problem?

Thank you.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Task Manager was lost/killed due to full GC

Greg Hogan
In reply to this post by ShB
Late response, but a common reason for disappearing TaskManagers is termination by the Linux out-of-memory killer, with the recommendation to decrease the allotted memory.


> On Sep 5, 2017, at 9:09 AM, ShB <[hidden email]> wrote:
>
> Hi,
>
> I'm running a Flink batch job that reads almost 1 TB of data from S3 and
> then performs operations on it. A list of filenames are distributed among
> the TM's and each subset of files is read from S3 from each TM. This job
> errors out at the read step due to the following error:
> java.lang.Exception: TaskManager was lost/killed
>
> Having read similar questions on the mailing list, it seems like this is a
> memory issue, with full GC at the TM causing the TM to be lost.
>
> After enabling memory debugging this seems to be the stats just before
> erroring out:
> Memory usage stats: [HEAP: 8327/18704/18704 MB, NON HEAP: 79/81/-1 MB
> (used/committed/max)]
> Direct memory stats: Count: 5236, Total Capacity: 17148907, Used Memory:
> 17148908
> Off-heap pool stats: [Code Cache: 25/27/240 MB (used/committed/max)],
> [Metaspace: 47/48/-1 MB (used/committed/max)], [Compressed Class Space:
> 5/5/1024 MB (used/committed/max)]
> Garbage collector stats: [G1 Young Generation, GC TIME (ms): 16712, GC
> COUNT: 290], [G1 Old Generation, GC TIME (ms): 689, GC COUNT: 2]
>
> I tried all of these suggested fixes: decreased taskmanager.memory.fraction
> to give more memory to user managed operations, increased number of
> JVM's(parallelism), used the G1 GC for better GC performance, but my job
> still errors out.  
>
> I increased akka.watch.heartbeat.pause, akka.watch.threshold,
> akka.watch.heartbeat.interval to prevent the timeout due to GC. But this
> doesn't help either. I figured with the really high values for death watch,
> the program would run really slowly and complete at some point but it fails
> anyway.
>
> I'm now trying to decrease object creation in my program, but so far it
> hasn't helped.
>
> How can I go about debugging and fixing this problem?
>
> Thank you.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

ShB
Reply | Threaded
Open this post in threaded view
|

Re: Task Manager was lost/killed due to full GC

ShB
Thanks for your response!

Recommendation to decrease allotted memory? Which allotted memory should be
decreased?

I tried decreasing taskmanager.memory.fraction to give more memory to user
managed operations, that doesn't work beyond a point. Also tried increasing
containerized.heap-cutoff-ratio, that didn't work either.

What eventually solved the problem was increasing parallelism - throwing in
many more task managers.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Task Manager was lost/killed due to full GC

Stephan Ewen
Hi!

The garbage collection stats actually look okay, not terribly bad - almost surprised that this seems to cause failures.

Can you check whether you find messages in the TM / JM log about heartbeat timeouts, actor systems being "gated" or "quarantined"?

Would also be interesting to know how the program is actually set up - where does data of the files you read go?
Do you just keep them as objects in lists, or do you emit them from the operators?

Best,
Stephan


On Wed, Sep 20, 2017 at 1:58 AM, ShB <[hidden email]> wrote:
Thanks for your response!

Recommendation to decrease allotted memory? Which allotted memory should be
decreased?

I tried decreasing taskmanager.memory.fraction to give more memory to user
managed operations, that doesn't work beyond a point. Also tried increasing
containerized.heap-cutoff-ratio, that didn't work either.

What eventually solved the problem was increasing parallelism - throwing in
many more task managers.

ShB
Reply | Threaded
Open this post in threaded view
|

Re: Task Manager was lost/killed due to full GC

ShB
Hi Stephan,

Thanks for your response!

Task manager lost/killed has been a recurring problem I've had with Flink
for the last few months, as I try to scale to larger and larger amounts of
data. I would be very grateful for some help figuring out how I can avoid
this.

The program is set up something like this:
/
DataSet<CustomType> data = env.fromCollection(listOfFiles)
.rebalance()
.flatMap(new ReadFiles())
.filter(new FilterData());

DataSet<Tuple8> computation1 = data
.map(new Compute1())
.distinct()
.map(new Compute2())
.groupBy(0, 1, 2)
.aggregate(SUM, 3).and(SUM, 4).and(SUM, 5);

Dataset<Tuple10> computation2 = data
.map(new Compute3())
.distinct()
.map(new Compute4())
.groupBy(0, 1, 2)
.aggregate(SUM, 3).and(SUM, 4).and(SUM, 5);

Dataset<Tuple12> finalOP = computation1.join(computation2)
                                                .where(0, 1)
                                                .equalTo(0, 1)
                                                .with(new Join1())
                                                .sortPartition(0, Order.ASCENDING)
                                                .setParallelism(1);

finalOP.writeAsCsv("s3://myBucket/myKey.csv");

---

public static final class ReadFiles implements FlatMapFunction<String,
CustomType> {
                @Override
                public void flatMap(String fileName, Collector<CustomType> out) throws
Exception {

                        S3FileReaderAndParser parser = new S3FileReaderAndParser(fileName);
                        List<CustomType> dataList = parser.parseFiles();
                        for (CustomType data : dataList) {
                                out.collect(data);
                        }
                }
        }
/

Task Manager is killed/lost during the ReadFiles() flatmap. ReadFiles is a
flatmap function that reads each of the files from S3 using the AWS S3 Java
SDK and parses and emits each of the protobufs.

And yes, I can find a message like this in the logs about "gated" systems:
2017-10-12 20:46:00,355 WARN  akka.remote.ReliableDeliverySupervisor                      
- Association with remote system [akka.tcp://flink@ip-172-31-8-29:38763] has
failed, address is now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@ip-172-31-8-29:38763]] Caused by: [Connection refused:
ip-172-31-8-29/172.31.8.29:38763]

Thank you!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
ShB
Reply | Threaded
Open this post in threaded view
|

Re: Task Manager was lost/killed due to full GC

ShB
Hi Stephan,

Apologies, I hit send too soon on the last email.

So, while trying to debug this, I ran it multiple times on different
instance types(to increase RAM available) and while digging into the logs, I
found this to be the error in the task manager logs:

/
java.lang.RuntimeException: Emitting the record caused an I/O exception: I/O
channel already closed. Could not fulfill:
org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@12b3c49e
        at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
        at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at
org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
        at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
        at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at
org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:104)
        at
org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:89)
        at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
        at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:90)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: I/O channel already closed. Could not
fulfill:
org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@12b3c49e
        at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:36)
        at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:26)
        at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:111)
        at
org.apache.flink.runtime.io.network.partition.ResultPartition.add(ResultPartition.java:278)
        at
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter.writeBuffer(ResultPartitionWriter.java:72)
        at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.writeAndClearBuffer(RecordWriter.java:223)
        at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:121)
        at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
        at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
        ... 13 more
/

Any idea on a fix for this issue? I can't seem to find any further
information on this in the mailing lists.

Thank you.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
ShB
Reply | Threaded
Open this post in threaded view
|

Re: Task Manager was lost/killed due to full GC

ShB
On further investigation, seems to me the I/O exception I posted previously
is not the cause of the TM being lost. it's the after effect of the TM being
shut down and the channel being closed after a record is emitted but before
it's processed.

Previously, the logs didn't throw up this error and I'm also unable to
reproduce it each time(I've come across the I/O exception twice so far).
Most of the time, the logs don't have the I/O or any other exception/error
messages.

This is what the logs usually(without the I/O exception) look like:
Job Manager:
/
2017-10-12 22:22:41,857 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Container container_1507845873691_0001_01_000008 failed. Exit status: -100
2017-10-12 22:22:41,858 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Diagnostics for container container_1507845873691_0001_01_000008 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 22:22:41,858 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Total number of failed containers so far: 1
2017-10-12 22:22:41,858 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 1
2017-10-12 22:22:42,096 INFO  org.apache.flink.yarn.YarnJobManager                        
- Task manager akka.tcp://flink@ip-172-31-43-115:43404/user/taskmanager
terminated.
2017-10-12 22:22:42,210 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter
at main(FindOutput.java:85)) (39/96) (530ca4789a921cab363f241176dac7a8)
switched from RUNNING to FAILED.
java.lang.Exception: TaskManager was lost/killed:
container_1507845873691_0001_01_000008 @
ip-172-31-43-115.us-west-2.compute.internal (dataPort=40747)
        at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
        at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-10-12 22:22:42,451 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Output
(0c45ba62b56fefd1c1e7bfd68923411d) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed:
container_1507845873691_0001_01_000008 @
ip-172-31-43-115.us-west-2.compute.internal (dataPort=40747)
        at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
        at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-10-12 22:22:42,907 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter
at main(FindOutput.java:85)) (1/96) (8cf2869e9786809d1b9b9d12b9467e40)
switched from RUNNING to CANCELING.
/
 
Task Manager:
/
2017-10-12 22:22:38,570 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369],
[G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2]
2017-10-12 22:22:38,631 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 5973/17600/17600 MB, NON HEAP: 72/73/-1 MB
(used/committed/max)]
2017-10-12 22:22:38,631 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 3150, Total Capacity: 17138363, Used Memory: 17138364
2017-10-12 22:22:38,631 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 18/18/240 MB (used/committed/max)], [Metaspace:
47/48/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB
(used/committed/max)]
2017-10-12 22:22:38,631 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369],
[G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2]
2017-10-12 22:22:38,691 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 6101/17600/17600 MB, NON HEAP: 72/73/-1 MB
(used/committed/max)]
2017-10-12 22:22:38,691 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 3234, Total Capacity: 17139035, Used Memory: 17139036
2017-10-12 22:22:38,691 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 18/18/240 MB (used/committed/max)], [Metaspace:
47/48/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB
(used/committed/max)]
2017-10-12 22:22:38,691 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369],
[G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2]
*2017-10-12 22:22:38,709 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                  
- RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.*
2017-10-12 22:22:38,713 INFO  org.apache.flink.runtime.blob.BlobCache                      
- Shutting down BlobCache
2017-10-12 22:22:38,719 INFO
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
removed spill file directory
/mnt/yarn/usercache/hadoop/appcache/application_1507845873691_0001/flink-io-a5aace05-73ed-4cea-ad07-db86f9f8ce21
2017-10-12 22:22:38,719 INFO
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
removed spill file directory
/mnt1/yarn/usercache/hadoop/appcache/application_1507845873691_0001/flink-io-cb34ffbe-879f-47d4-9df3-6ed2b0dcd799
/





This is what the logs sometimes(with the I/O exception) look like:
Job Manager:
/
2017-10-12 19:40:37,669 WARN  akka.remote.ReliableDeliverySupervisor                      
- Association with remote system [akka.tcp://flink@ip-172-31-11-129:43340]
has failed, address is now gated for [5000] ms. Reason: [Disassociated]
2017-10-12 19:40:37,922 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Container container_1507836035753_0001_01_000015 failed. Exit status: -100
2017-10-12 19:40:37,922 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Diagnostics for container container_1507836035753_0001_01_000015 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 19:40:37,922 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Total number of failed containers so far: 1
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Container container_1507836035753_0001_01_000013 failed. Exit status: -100
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Diagnostics for container container_1507836035753_0001_01_000013 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Total number of failed containers so far: 2
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Container container_1507836035753_0001_01_000002 failed. Exit status: -100
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Diagnostics for container container_1507836035753_0001_01_000002 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Total number of failed containers so far: 3
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Container container_1507836035753_0001_01_000003 failed. Exit status: -100
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Diagnostics for container container_1507836035753_0001_01_000003 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Total number of failed containers so far: 4
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 1
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 2
2017-10-12 19:40:37,924 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 3
2017-10-12 19:40:37,924 INFO  org.apache.flink.yarn.YarnJobManager                        
- Task manager akka.tcp://flink@ip-172-31-1-178:33620/user/taskmanager
terminated.
2017-10-12 19:40:37,924 INFO  org.apache.flink.yarn.YarnFlinkResourceManager              
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 4
2017-10-12 19:40:37,925 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter
at main(FindOutput.java:87)) (40/136) (748d815623ff13e6357f351d5aa7b0f4)
switched from RUNNING to FAILED.
java.lang.Exception: TaskManager was lost/killed:
container_1507836035753_0001_01_000015 @
ip-172-31-1-178.us-west-2.compute.internal (dataPort=35861)
        at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
        at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-10-12 19:40:37,931 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Output
(11771c44eace0a1e32de1c3ca1c60b09) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed:
container_1507836035753_0001_01_000015 @
ip-172-31-1-178.us-west-2.compute.internal (dataPort=35861)
        at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
        at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
/

Task Manager:
/
2017-10-12 19:40:34,959 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 1347/17600/17600 MB, NON HEAP: 73/74/-1 MB
(used/committed/max)]
2017-10-12 19:40:34,959 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 162, Total Capacity: 17111387, Used Memory: 17111388
2017-10-12 19:40:34,959 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 19/19/240 MB (used/committed/max)], [Metaspace:
48/49/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB
(used/committed/max)]
2017-10-12 19:40:34,959 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 8020, GC COUNT: 363],
[G1 Old Generation, GC TIME (ms): 695, GC COUNT: 2]
2017-10-12 19:40:35,019 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 1467/17600/17600 MB, NON HEAP: 73/74/-1 MB
(used/committed/max)]
2017-10-12 19:40:35,019 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 196, Total Capacity: 17111659, Used Memory: 17111660
2017-10-12 19:40:35,019 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 19/19/240 MB (used/committed/max)], [Metaspace:
48/49/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB
(used/committed/max)]
2017-10-12 19:40:35,019 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 8020, GC COUNT: 363],
[G1 Old Generation, GC TIME (ms): 695, GC COUNT: 2]
*2017-10-12 19:40:35,033 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                  
- RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.*
2017-10-12 19:40:35,043 ERROR org.apache.flink.runtime.operators.BatchTask                
- Error in task code:  CHAIN Partition -> FlatMap (FlatMap at
main(FindOutput.java:83)) -> Filter (Filter at main(FindOutput.java:87))
(86/136)
java.lang.RuntimeException: Emitting the record caused an I/O exception: I/O
channel already closed. Could not fulfill:
org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@21d84696
        at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
        at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at
org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
        at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
        at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at
org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:104)
        at
org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:89)
        at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
        at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:90)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: I/O channel already closed. Could not
fulfill:
org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@21d84696
        at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:36)
        at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:26)
        at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:111)
        at
org.apache.flink.runtime.io.network.partition.ResultPartition.add(ResultPartition.java:278)
        at
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter.writeBuffer(ResultPartitionWriter.java:72)
        at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.writeAndClearBuffer(RecordWriter.java:223)
        at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:121)
        at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
        at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
        ... 13 more
2017-10-12 19:40:35,050 INFO
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
removed spill file directory
/mnt/yarn/usercache/hadoop/appcache/application_1507836035753_0001/flink-io-81d98a3a-7a40-438f-93fa-3b1f9dfc1e1d
/

I still can't figure out why the TM shuts down and how to avoid this at all
- seems like a memory/GC issue. I was able to have the job complete
previously by increasing parallelism(number of task managers). But as my
dataset size has increases, I'm running into this issue again and increasing
parallelism is not working.

Any help would be greatly appreciated!

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
ShB
Reply | Threaded
Open this post in threaded view
|

Re: Task Manager was lost/killed due to full GC

ShB
In reply to this post by ShB
I just wanted to leave an update about this issue, for someone else who might
come across it. The problem was with memory, but it was disk memory and not
heap/off-heap memory. Yarn was killing off my containers as they exceeded
the threshold for disk utilization and this was manifesting as Task manager
was lost/killed or JobClientActorConnectionTimeoutException: Lost connection
to the JobManager. Digging deep into the individual instance node manager
logs provided some hints about it being a disk issue.

Some fixes for this problem:
yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage
-- can be increased to alleviate the problem temporarily.
Increasing the disk capacity on each task manager is a more long-term fix.
Increasing the number of task managers increases available disk memory and
hence is also a fix.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Task Manager was lost/killed due to full GC

Fabian Hueske-2
Thanks for the heads-up and explaining how you resolve the issue!

Best, Fabian

2017-10-18 3:50 GMT+02:00 ShB <[hidden email]>:
I just wanted to leave an update about this issue, for someone else who might
come across it. The problem was with memory, but it was disk memory and not
heap/off-heap memory. Yarn was killing off my containers as they exceeded
the threshold for disk utilization and this was manifesting as Task manager
was lost/killed or JobClientActorConnectionTimeoutException: Lost connection
to the JobManager. Digging deep into the individual instance node manager
logs provided some hints about it being a disk issue.

Some fixes for this problem:
yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage
-- can be increased to alleviate the problem temporarily.
Increasing the disk capacity on each task manager is a more long-term fix.
Increasing the number of task managers increases available disk memory and
hence is also a fix.

Thanks!