Strange Behaviour with task manager oom ?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Strange Behaviour with task manager oom ?

sohimankotia
Hi,

I am running flink batch job .

My job is running fine if i use 4 task manger and 8 slots = 32 parallelism
with 6GB memory per task manager.

As soon I increase task mangers to 5 with 6 task per task manager = 30
parallelism (6GB memory per task manager)

I am getting oom error . I am not able to understand this strange behaviour
.

Container: container_e60_1526906661225_0213_01_000001 on
dh-cdh-m1d5.dbp.host1.in_8041
============================================================================================
LogType:jobmanager.err
Log Upload Time:Mon May 21 19:48:12 +0530 2018
LogLength:3247
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/mnt/vol9/yarn/nm/usercache/hdfs/appcache/application_1526906661225_0213/filecache/13/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/mnt/vol7/yarn/nm/usercache/hdfs/appcache/application_1526906661225_0213/filecache/11/process-runners-0.0.2-SNAPSHOT-shaded.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-5]
shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for
ActorSystem[flink]
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
        at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
        at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
        at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
        at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
        at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
        at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
        at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LogType:jobmanager.log





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Strange Behaviour with task manager oom ?

Amit Jain
Hi,
Could you share log of job and impacted task manager? How much memory
you have allocated to the Job Manager?

--
Thanks,
Amit

On Mon, May 21, 2018 at 8:46 PM, sohimankotia <[hidden email]> wrote:

> Hi,
>
> I am running flink batch job .
>
> My job is running fine if i use 4 task manger and 8 slots = 32 parallelism
> with 6GB memory per task manager.
>
> As soon I increase task mangers to 5 with 6 task per task manager = 30
> parallelism (6GB memory per task manager)
>
> I am getting oom error . I am not able to understand this strange behaviour
> .
>
> Container: container_e60_1526906661225_0213_01_000001 on
> dh-cdh-m1d5.dbp.host1.in_8041
> ============================================================================================
> LogType:jobmanager.err
> Log Upload Time:Mon May 21 19:48:12 +0530 2018
> LogLength:3247
> Log Contents:
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/mnt/vol9/yarn/nm/usercache/hdfs/appcache/application_1526906661225_0213/filecache/13/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/mnt/vol7/yarn/nm/usercache/hdfs/appcache/application_1526906661225_0213/filecache/11/process-runners-0.0.2-SNAPSHOT-shaded.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-5]
> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for
> ActorSystem[flink]
> java.lang.OutOfMemoryError: Java heap space
>         at java.util.Arrays.copyOf(Arrays.java:3236)
>         at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
>         at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>         at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>         at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>         at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>         at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>         at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>         at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>         at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>         at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>         at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
>         at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
>         at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>         at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> LogType:jobmanager.log
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Strange Behaviour with task manager oom ?

sohimankotia
Hi Amit ,

Thanks for response . Meanwhile I figured out the issue .

I had /Class X extending RichMapFunction/ and this class was preparing some
heavy data required for map function . I just moved that code to *open()*
function and it worked fine .

So I have one doubt , was it because flink was not able serialize data which
was being initialised in constructor ??

Thanks
Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/