Over-requesting Containers on YARN

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Over-requesting Containers on YARN

austin.ce
Hi all,

We have a Flink 1.6 streaming application running on Amazon EMR, with a YARN session configured with 20GB for the Task Manager, 2GB for the Job Manager, and 4 slots (number of vCPUs), in detached mode. Each Core Node has 4 vCores, 32 GB mem, 32 GB disc, and each Task Node has 4 vCores, 8 GB mem, 32 GB disc. We have auto-scaling for Core Nodes based on the HDFS Utilization and Capacity Remaining GB, as well as auto-scaling for the Task Nodes based on YARN Available Memory and the number of Pending Containers. We've got Log Aggregation turned on as well. This runs well under normal pressure for about a week, where upon YARN can no longer allocate the resource requests from Flink, causing container requests to build up. Even when scaled up, the container requests don't seem to be fulfilled. I've seen that it seems to start. Does anyone have a good guide to setting up a streaming application on EMR with YARN?

Thank you,
Austin Cawley-Edwards
Reply | Threaded
Open this post in threaded view
|

Re: Over-requesting Containers on YARN

austin.ce
We are seeing this OutOfMemoryError in the container logs. How can we increase the memory to take full advantage of the cluster? Or do we just have to more aggressively scale?

Best,
Austin
java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:3664) at java.lang.String.<init>(String.java:207) at java.lang.String.substring(String.java:1969) at sun.reflect.misc.ReflectUtil.isNonPublicProxyClass(ReflectUtil.java:288) at sun.reflect.misc.ReflectUtil.checkPackageAccess(ReflectUtil.java:165) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:328) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:328) at akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:156) at akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:142) at scala.util.Try$.apply(Try.scala:192) at akka.serialization.Serialization.deserialize(Serialization.scala:136) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:30) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:64) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:64) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:82) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:982) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

On Tue, Dec 4, 2018 at 11:24 AM Austin Cawley-Edwards <[hidden email]> wrote:
Hi all,

We have a Flink 1.6 streaming application running on Amazon EMR, with a YARN session configured with 20GB for the Task Manager, 2GB for the Job Manager, and 4 slots (number of vCPUs), in detached mode. Each Core Node has 4 vCores, 32 GB mem, 32 GB disc, and each Task Node has 4 vCores, 8 GB mem, 32 GB disc. We have auto-scaling for Core Nodes based on the HDFS Utilization and Capacity Remaining GB, as well as auto-scaling for the Task Nodes based on YARN Available Memory and the number of Pending Containers. We've got Log Aggregation turned on as well. This runs well under normal pressure for about a week, where upon YARN can no longer allocate the resource requests from Flink, causing container requests to build up. Even when scaled up, the container requests don't seem to be fulfilled. I've seen that it seems to start. Does anyone have a good guide to setting up a streaming application on EMR with YARN?

Thank you,
Austin Cawley-Edwards
Reply | Threaded
Open this post in threaded view
|

Re: Over-requesting Containers on YARN

austin.ce
Perhaps related to this, one of my Tasks does not seem to be restoring correctly / check pointing. It hangs during the checkpoint process and then causes a timeout and then says "Checkpoint Coordinator is suspended."  I have increased the "slot.idel.timeout" as was recommended here, and though it lasted longer, the checkpoint still failed.

Thanks,
Austin

On Tue, Dec 4, 2018 at 12:24 PM Austin Cawley-Edwards <[hidden email]> wrote:
We are seeing this OutOfMemoryError in the container logs. How can we increase the memory to take full advantage of the cluster? Or do we just have to more aggressively scale?

Best,
Austin
java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:3664) at java.lang.String.<init>(String.java:207) at java.lang.String.substring(String.java:1969) at sun.reflect.misc.ReflectUtil.isNonPublicProxyClass(ReflectUtil.java:288) at sun.reflect.misc.ReflectUtil.checkPackageAccess(ReflectUtil.java:165) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:328) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:328) at akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:156) at akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:142) at scala.util.Try$.apply(Try.scala:192) at akka.serialization.Serialization.deserialize(Serialization.scala:136) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:30) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:64) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:64) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:82) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:982) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

On Tue, Dec 4, 2018 at 11:24 AM Austin Cawley-Edwards <[hidden email]> wrote:
Hi all,

We have a Flink 1.6 streaming application running on Amazon EMR, with a YARN session configured with 20GB for the Task Manager, 2GB for the Job Manager, and 4 slots (number of vCPUs), in detached mode. Each Core Node has 4 vCores, 32 GB mem, 32 GB disc, and each Task Node has 4 vCores, 8 GB mem, 32 GB disc. We have auto-scaling for Core Nodes based on the HDFS Utilization and Capacity Remaining GB, as well as auto-scaling for the Task Nodes based on YARN Available Memory and the number of Pending Containers. We've got Log Aggregation turned on as well. This runs well under normal pressure for about a week, where upon YARN can no longer allocate the resource requests from Flink, causing container requests to build up. Even when scaled up, the container requests don't seem to be fulfilled. I've seen that it seems to start. Does anyone have a good guide to setting up a streaming application on EMR with YARN?

Thank you,
Austin Cawley-Edwards