Hi,
I am running flink batch job . My job is running fine if i use 4 task manger and 8 slots = 32 parallelism with 6GB memory per task manager. As soon I increase task mangers to 5 with 6 task per task manager = 30 parallelism (6GB memory per task manager) I am getting oom error . I am not able to understand this strange behaviour . Container: container_e60_1526906661225_0213_01_000001 on dh-cdh-m1d5.dbp.host1.in_8041 ============================================================================================ LogType:jobmanager.err Log Upload Time:Mon May 21 19:48:12 +0530 2018 LogLength:3247 Log Contents: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/vol9/yarn/nm/usercache/hdfs/appcache/application_1526906661225_0213/filecache/13/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/mnt/vol7/yarn/nm/usercache/hdfs/appcache/application_1526906661225_0213/filecache/11/process-runners-0.0.2-SNAPSHOT-shaded.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-5] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875) at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874) at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769) at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) LogType:jobmanager.log -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Could you share log of job and impacted task manager? How much memory you have allocated to the Job Manager? -- Thanks, Amit On Mon, May 21, 2018 at 8:46 PM, sohimankotia <[hidden email]> wrote: > Hi, > > I am running flink batch job . > > My job is running fine if i use 4 task manger and 8 slots = 32 parallelism > with 6GB memory per task manager. > > As soon I increase task mangers to 5 with 6 task per task manager = 30 > parallelism (6GB memory per task manager) > > I am getting oom error . I am not able to understand this strange behaviour > . > > Container: container_e60_1526906661225_0213_01_000001 on > dh-cdh-m1d5.dbp.host1.in_8041 > ============================================================================================ > LogType:jobmanager.err > Log Upload Time:Mon May 21 19:48:12 +0530 2018 > LogLength:3247 > Log Contents: > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/mnt/vol9/yarn/nm/usercache/hdfs/appcache/application_1526906661225_0213/filecache/13/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/mnt/vol7/yarn/nm/usercache/hdfs/appcache/application_1526906661225_0213/filecache/11/process-runners-0.0.2-SNAPSHOT-shaded.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-5] > shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for > ActorSystem[flink] > java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) > at > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) > at > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) > at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) > at > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875) > at > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874) > at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769) > at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > LogType:jobmanager.log > > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Amit ,
Thanks for response . Meanwhile I figured out the issue . I had /Class X extending RichMapFunction/ and this class was preparing some heavy data required for map function . I just moved that code to *open()* function and it worked fine . So I have one doubt , was it because flink was not able serialize data which was being initialised in constructor ?? Thanks Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |