problem on yarn cluster

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

problem on yarn cluster

Michele Bertoni
Hi,
I have a problem running my app on a Yarn cluster

I developed it in my computer and everything is working fine
then we setup the environment on Amazon EMR reading data from HDFS not S3

we run it with these command

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

we are using flink 0.9.0-milestone-1

after running it, the terminal windows where we launch it totally crash, the last messages are

05/18/2015 15:19:56 Job execution switched to status FINISHED.
05/18/2015 15:19:56 Job execution switched to status FAILING.



this is the error from the yarn log

2015-05-18 15:19:53 ERROR ApplicationMaster$$anonfun$2$$anon$1:66 - Could not process accumulator event of job 5429fe215a3953101cd575cd82b596c8 received from <a href="akka://flink/deadLetters" class="">akka://flink/deadLetters.
java.lang.OutOfMemoryError: Java heap space
	at org.apache.flink.api.common.accumulators.ListAccumulator.read(ListAccumulator.java:91)
	at org.apache.flink.runtime.accumulators.AccumulatorEvent.getAccumulators(AccumulatorEvent.java:124)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:350)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-05-18 15:19:56 ERROR OneForOneStrategy:66 - java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:51)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:35)
	at org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAccumulatorResults(AccumulatorManager.java:77)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:300)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:274)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
	... 25 more






Can you help me understanding something?

thanks

Reply | Threaded
Open this post in threaded view
|

Re: problem on yarn cluster

Stephan Ewen
Hi Michele!

It looks like there are quite a few things going wrong in your case. Let me see what I can deduce from the output you are showing me:


1) You seem to run into a bug that exists in the 0.9-milestone-1 and has been fixed in the master:

As far as I can tell, you call "collect()" on a data set, and the type that you collect is a custom type.
The milestone has a bug there that it uses the wrong classloader, so "collect()" is unfortunately not supported there on custom types. The latest SNAPSHOT version should have this fixed.
We are pushing to finalize the code for the 0.9 release, so hopefully we have an official release with that fix in a few weeks.


2) The results that you collect seem to be very large, so the JobManager actually runs out of memory while collecting them. This situation should be improved in the current master as well,
even though it is still possible to break the master's heap with the collect() call (we plan to fix that soon as well).

Can you try and see if the latest SNAPSHOT version solves your issue?




BTW: It looks like you are starting two YARN sessions actually (Robert can probably comment on this)

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

The first line starts a YARN session against which you can run multiple jobs. The second line actually starts another dedicated YARN session for that job.



Greetings,
Stephan


On Mon, May 18, 2015 at 5:30 PM, Michele Bertoni <[hidden email]> wrote:
Hi,
I have a problem running my app on a Yarn cluster

I developed it in my computer and everything is working fine
then we setup the environment on Amazon EMR reading data from HDFS not S3

we run it with these command

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

we are using flink 0.9.0-milestone-1

after running it, the terminal windows where we launch it totally crash, the last messages are

05/18/2015 15:19:56 Job execution switched to status FINISHED.
05/18/2015 15:19:56 Job execution switched to status FAILING.



this is the error from the yarn log

2015-05-18 15:19:53 ERROR ApplicationMaster$$anonfun$2$$anon$1:66 - Could not process accumulator event of job 5429fe215a3953101cd575cd82b596c8 received from akka://flink/deadLetters.
java.lang.OutOfMemoryError: Java heap space
	at org.apache.flink.api.common.accumulators.ListAccumulator.read(ListAccumulator.java:91)
	at org.apache.flink.runtime.accumulators.AccumulatorEvent.getAccumulators(AccumulatorEvent.java:124)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:350)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-05-18 15:19:56 ERROR OneForOneStrategy:66 - java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:51)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:35)
	at org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAccumulatorResults(AccumulatorManager.java:77)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:300)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:274)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
	... 25 more






Can you help me understanding something?

thanks


Reply | Threaded
Open this post in threaded view
|

Re: problem on yarn cluster

rmetzger0
Hi,

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096

will start a YARN session with 4 containers (3 workers, 1 master). Once the session is running, you can submit as many jobs as you want to this session, using
./bin/flink run ./path/to/jar

The YARN session will create a hidden file in conf/ which contains the connection details for the ./bin/flink tool.

With
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

You can start the job in /path/to/jar directly on YARN. Behind the scenes, we'll start a YARN session just for executing this one job.
Since you're currently debugging your job, I would suggest to start one yarn session and submit jobs against it.

Robert



On Mon, May 18, 2015 at 5:58 PM, Stephan Ewen <[hidden email]> wrote:
Hi Michele!

It looks like there are quite a few things going wrong in your case. Let me see what I can deduce from the output you are showing me:


1) You seem to run into a bug that exists in the 0.9-milestone-1 and has been fixed in the master:

As far as I can tell, you call "collect()" on a data set, and the type that you collect is a custom type.
The milestone has a bug there that it uses the wrong classloader, so "collect()" is unfortunately not supported there on custom types. The latest SNAPSHOT version should have this fixed.
We are pushing to finalize the code for the 0.9 release, so hopefully we have an official release with that fix in a few weeks.


2) The results that you collect seem to be very large, so the JobManager actually runs out of memory while collecting them. This situation should be improved in the current master as well,
even though it is still possible to break the master's heap with the collect() call (we plan to fix that soon as well).

Can you try and see if the latest SNAPSHOT version solves your issue?




BTW: It looks like you are starting two YARN sessions actually (Robert can probably comment on this)

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

The first line starts a YARN session against which you can run multiple jobs. The second line actually starts another dedicated YARN session for that job.



Greetings,
Stephan


On Mon, May 18, 2015 at 5:30 PM, Michele Bertoni <[hidden email]> wrote:
Hi,
I have a problem running my app on a Yarn cluster

I developed it in my computer and everything is working fine
then we setup the environment on Amazon EMR reading data from HDFS not S3

we run it with these command

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

we are using flink 0.9.0-milestone-1

after running it, the terminal windows where we launch it totally crash, the last messages are

05/18/2015 15:19:56 Job execution switched to status FINISHED.
05/18/2015 15:19:56 Job execution switched to status FAILING.



this is the error from the yarn log

2015-05-18 15:19:53 ERROR ApplicationMaster$$anonfun$2$$anon$1:66 - Could not process accumulator event of job 5429fe215a3953101cd575cd82b596c8 received from akka://flink/deadLetters.
java.lang.OutOfMemoryError: Java heap space
	at org.apache.flink.api.common.accumulators.ListAccumulator.read(ListAccumulator.java:91)
	at org.apache.flink.runtime.accumulators.AccumulatorEvent.getAccumulators(AccumulatorEvent.java:124)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:350)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-05-18 15:19:56 ERROR OneForOneStrategy:66 - java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:51)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:35)
	at org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAccumulatorResults(AccumulatorManager.java:77)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:300)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:274)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
	... 25 more






Can you help me understanding something?

thanks



Reply | Threaded
Open this post in threaded view
|

Re: problem on yarn cluster

Michele Bertoni
In reply to this post by Stephan Ewen
Hi! thanks for answer!

for 1 you are right! I am using collect method on custom object, tomorrow I will try moving back to snapshot.
why is it working correctly in my local environment (that is also a milestone1)?

on the other side i am not collecting large results: i am running on a test dataset of few KB, moreover i am using it because it is a very small subset of the entire set that requires a lot manipulation, in fact the out of memory didn’t appear everytime…i will try solving the first then let’s see if this one reappears!


cheers
michele


Il giorno 18/mag/2015, alle ore 17:58, Stephan Ewen <[hidden email]> ha scritto:

Hi Michele!

It looks like there are quite a few things going wrong in your case. Let me see what I can deduce from the output you are showing me:


1) You seem to run into a bug that exists in the 0.9-milestone-1 and has been fixed in the master:

As far as I can tell, you call "collect()" on a data set, and the type that you collect is a custom type.
The milestone has a bug there that it uses the wrong classloader, so "collect()" is unfortunately not supported there on custom types. The latest SNAPSHOT version should have this fixed.
We are pushing to finalize the code for the 0.9 release, so hopefully we have an official release with that fix in a few weeks.


2) The results that you collect seem to be very large, so the JobManager actually runs out of memory while collecting them. This situation should be improved in the current master as well,
even though it is still possible to break the master's heap with the collect() call (we plan to fix that soon as well).

Can you try and see if the latest SNAPSHOT version solves your issue?




BTW: It looks like you are starting two YARN sessions actually (Robert can probably comment on this)

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

The first line starts a YARN session against which you can run multiple jobs. The second line actually starts another dedicated YARN session for that job.



Greetings,
Stephan


On Mon, May 18, 2015 at 5:30 PM, Michele Bertoni <[hidden email]> wrote:
Hi,
I have a problem running my app on a Yarn cluster

I developed it in my computer and everything is working fine
then we setup the environment on Amazon EMR reading data from HDFS not S3

we run it with these command

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

we are using flink 0.9.0-milestone-1

after running it, the terminal windows where we launch it totally crash, the last messages are

05/18/2015 15:19:56 Job execution switched to status FINISHED.
05/18/2015 15:19:56 Job execution switched to status FAILING.



this is the error from the yarn log

2015-05-18 15:19:53 ERROR ApplicationMaster$$anonfun$2$$anon$1:66 - Could not process accumulator event of job 5429fe215a3953101cd575cd82b596c8 received from akka://flink/deadLetters.
java.lang.OutOfMemoryError: Java heap space
	at org.apache.flink.api.common.accumulators.ListAccumulator.read(ListAccumulator.java:91)
	at org.apache.flink.runtime.accumulators.AccumulatorEvent.getAccumulators(AccumulatorEvent.java:124)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:350)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-05-18 15:19:56 ERROR OneForOneStrategy:66 - java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:51)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:35)
	at org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAccumulatorResults(AccumulatorManager.java:77)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:300)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:274)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
	... 25 more






Can you help me understanding something?

thanks



Reply | Threaded
Open this post in threaded view
|

Re: problem on yarn cluster

Michele Bertoni
In reply to this post by rmetzger0
Got it!

thank you

Best,
Michele

Il giorno 18/mag/2015, alle ore 19:29, Robert Metzger <[hidden email]> ha scritto:

Hi,

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096

will start a YARN session with 4 containers (3 workers, 1 master). Once the session is running, you can submit as many jobs as you want to this session, using
./bin/flink run ./path/to/jar

The YARN session will create a hidden file in conf/ which contains the connection details for the ./bin/flink tool.

With
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

You can start the job in /path/to/jar directly on YARN. Behind the scenes, we'll start a YARN session just for executing this one job.
Since you're currently debugging your job, I would suggest to start one yarn session and submit jobs against it.

Robert



On Mon, May 18, 2015 at 5:58 PM, Stephan Ewen <[hidden email]> wrote:
Hi Michele!

It looks like there are quite a few things going wrong in your case. Let me see what I can deduce from the output you are showing me:


1) You seem to run into a bug that exists in the 0.9-milestone-1 and has been fixed in the master:

As far as I can tell, you call "collect()" on a data set, and the type that you collect is a custom type.
The milestone has a bug there that it uses the wrong classloader, so "collect()" is unfortunately not supported there on custom types. The latest SNAPSHOT version should have this fixed.
We are pushing to finalize the code for the 0.9 release, so hopefully we have an official release with that fix in a few weeks.


2) The results that you collect seem to be very large, so the JobManager actually runs out of memory while collecting them. This situation should be improved in the current master as well,
even though it is still possible to break the master's heap with the collect() call (we plan to fix that soon as well).

Can you try and see if the latest SNAPSHOT version solves your issue?




BTW: It looks like you are starting two YARN sessions actually (Robert can probably comment on this)

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

The first line starts a YARN session against which you can run multiple jobs. The second line actually starts another dedicated YARN session for that job.



Greetings,
Stephan


On Mon, May 18, 2015 at 5:30 PM, Michele Bertoni <[hidden email]> wrote:
Hi,
I have a problem running my app on a Yarn cluster

I developed it in my computer and everything is working fine
then we setup the environment on Amazon EMR reading data from HDFS not S3

we run it with these command

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

we are using flink 0.9.0-milestone-1

after running it, the terminal windows where we launch it totally crash, the last messages are

05/18/2015 15:19:56 Job execution switched to status FINISHED.
05/18/2015 15:19:56 Job execution switched to status FAILING.



this is the error from the yarn log

2015-05-18 15:19:53 ERROR ApplicationMaster$$anonfun$2$$anon$1:66 - Could not process accumulator event of job 5429fe215a3953101cd575cd82b596c8 received from akka://flink/deadLetters.
java.lang.OutOfMemoryError: Java heap space
	at org.apache.flink.api.common.accumulators.ListAccumulator.read(ListAccumulator.java:91)
	at org.apache.flink.runtime.accumulators.AccumulatorEvent.getAccumulators(AccumulatorEvent.java:124)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:350)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-05-18 15:19:56 ERROR OneForOneStrategy:66 - java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:51)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:35)
	at org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAccumulatorResults(AccumulatorManager.java:77)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:300)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:274)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
	... 25 more






Can you help me understanding something?

thanks




Reply | Threaded
Open this post in threaded view
|

Re: problem on yarn cluster

rmetzger0
In reply to this post by Michele Bertoni
You are not getting the issue in your local environment, because there everything is running in one JVM and the needed class is available there.
In the distributed case, we have a special usercode classloader which can load classes from the user's jar.

On Mon, May 18, 2015 at 9:16 PM, Michele Bertoni <[hidden email]> wrote:
Hi! thanks for answer!

for 1 you are right! I am using collect method on custom object, tomorrow I will try moving back to snapshot.
why is it working correctly in my local environment (that is also a milestone1)?

on the other side i am not collecting large results: i am running on a test dataset of few KB, moreover i am using it because it is a very small subset of the entire set that requires a lot manipulation, in fact the out of memory didn’t appear everytime…i will try solving the first then let’s see if this one reappears!


cheers
michele


Il giorno 18/mag/2015, alle ore 17:58, Stephan Ewen <[hidden email]> ha scritto:

Hi Michele!

It looks like there are quite a few things going wrong in your case. Let me see what I can deduce from the output you are showing me:


1) You seem to run into a bug that exists in the 0.9-milestone-1 and has been fixed in the master:

As far as I can tell, you call "collect()" on a data set, and the type that you collect is a custom type.
The milestone has a bug there that it uses the wrong classloader, so "collect()" is unfortunately not supported there on custom types. The latest SNAPSHOT version should have this fixed.
We are pushing to finalize the code for the 0.9 release, so hopefully we have an official release with that fix in a few weeks.


2) The results that you collect seem to be very large, so the JobManager actually runs out of memory while collecting them. This situation should be improved in the current master as well,
even though it is still possible to break the master's heap with the collect() call (we plan to fix that soon as well).

Can you try and see if the latest SNAPSHOT version solves your issue?




BTW: It looks like you are starting two YARN sessions actually (Robert can probably comment on this)

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

The first line starts a YARN session against which you can run multiple jobs. The second line actually starts another dedicated YARN session for that job.



Greetings,
Stephan


On Mon, May 18, 2015 at 5:30 PM, Michele Bertoni <[hidden email]> wrote:
Hi,
I have a problem running my app on a Yarn cluster

I developed it in my computer and everything is working fine
then we setup the environment on Amazon EMR reading data from HDFS not S3

we run it with these command

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

we are using flink 0.9.0-milestone-1

after running it, the terminal windows where we launch it totally crash, the last messages are

05/18/2015 15:19:56 Job execution switched to status FINISHED.
05/18/2015 15:19:56 Job execution switched to status FAILING.



this is the error from the yarn log

2015-05-18 15:19:53 ERROR ApplicationMaster$$anonfun$2$$anon$1:66 - Could not process accumulator event of job 5429fe215a3953101cd575cd82b596c8 received from akka://flink/deadLetters.
java.lang.OutOfMemoryError: Java heap space
	at org.apache.flink.api.common.accumulators.ListAccumulator.read(ListAccumulator.java:91)
	at org.apache.flink.runtime.accumulators.AccumulatorEvent.getAccumulators(AccumulatorEvent.java:124)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:350)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-05-18 15:19:56 ERROR OneForOneStrategy:66 - java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:51)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:35)
	at org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAccumulatorResults(AccumulatorManager.java:77)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:300)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:274)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
	... 25 more






Can you help me understanding something?

thanks




Reply | Threaded
Open this post in threaded view
|

Re: problem on yarn cluster

Michele Bertoni
It worked!
We used the 0.9-SNAPSHOT as you said, and it worked perfectly, also with larger data set we didn’t face any outofmemory problem

thanks

Il giorno 18/mag/2015, alle ore 21:19, Robert Metzger <[hidden email]> ha scritto:

You are not getting the issue in your local environment, because there everything is running in one JVM and the needed class is available there.
In the distributed case, we have a special usercode classloader which can load classes from the user's jar.

On Mon, May 18, 2015 at 9:16 PM, Michele Bertoni <[hidden email]> wrote:
Hi! thanks for answer!

for 1 you are right! I am using collect method on custom object, tomorrow I will try moving back to snapshot.
why is it working correctly in my local environment (that is also a milestone1)?

on the other side i am not collecting large results: i am running on a test dataset of few KB, moreover i am using it because it is a very small subset of the entire set that requires a lot manipulation, in fact the out of memory didn’t appear everytime…i will try solving the first then let’s see if this one reappears!


cheers
michele


Il giorno 18/mag/2015, alle ore 17:58, Stephan Ewen <[hidden email]> ha scritto:

Hi Michele!

It looks like there are quite a few things going wrong in your case. Let me see what I can deduce from the output you are showing me:


1) You seem to run into a bug that exists in the 0.9-milestone-1 and has been fixed in the master:

As far as I can tell, you call "collect()" on a data set, and the type that you collect is a custom type.
The milestone has a bug there that it uses the wrong classloader, so "collect()" is unfortunately not supported there on custom types. The latest SNAPSHOT version should have this fixed.
We are pushing to finalize the code for the 0.9 release, so hopefully we have an official release with that fix in a few weeks.


2) The results that you collect seem to be very large, so the JobManager actually runs out of memory while collecting them. This situation should be improved in the current master as well,
even though it is still possible to break the master's heap with the collect() call (we plan to fix that soon as well).

Can you try and see if the latest SNAPSHOT version solves your issue?




BTW: It looks like you are starting two YARN sessions actually (Robert can probably comment on this)

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

The first line starts a YARN session against which you can run multiple jobs. The second line actually starts another dedicated YARN session for that job.



Greetings,
Stephan


On Mon, May 18, 2015 at 5:30 PM, Michele Bertoni <[hidden email]> wrote:
Hi,
I have a problem running my app on a Yarn cluster

I developed it in my computer and everything is working fine
then we setup the environment on Amazon EMR reading data from HDFS not S3

we run it with these command

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar

we are using flink 0.9.0-milestone-1

after running it, the terminal windows where we launch it totally crash, the last messages are

05/18/2015 15:19:56 Job execution switched to status FINISHED.
05/18/2015 15:19:56 Job execution switched to status FAILING.



this is the error from the yarn log

2015-05-18 15:19:53 ERROR ApplicationMaster$$anonfun$2$$anon$1:66 - Could not process accumulator event of job 5429fe215a3953101cd575cd82b596c8 received from akka://flink/deadLetters.
java.lang.OutOfMemoryError: Java heap space
	at org.apache.flink.api.common.accumulators.ListAccumulator.read(ListAccumulator.java:91)
	at org.apache.flink.runtime.accumulators.AccumulatorEvent.getAccumulators(AccumulatorEvent.java:124)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:350)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-05-18 15:19:56 ERROR OneForOneStrategy:66 - java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:51)
	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:35)
	at org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAccumulatorResults(AccumulatorManager.java:77)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:300)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:274)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
	... 25 more






Can you help me understanding something?

thanks