Hello all,
I have a pretty complicated plan file using the Flink Python API running on a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a dictionary learning algorithm and has to run a sequence of operations many times; each sequence involves bulk iterations with join operations and other more intensive operations, and depends on the results of the previous sequence. I have found that when the number of times to run this sequence of operations is high (e.g. 20) I get this exception: Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink] java.lang.StackOverflowError at java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ..........<snip similar traces>.................... I assume this problem is caused by having to send too many serialized operations between Java and Python. When using a Java implementation of the same operations, I also get: java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106) at org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99) at org.apache.flink.optimizer.plan.DualInputPlanNode.<init>(DualInputPlanNode.java:90) at org.apache.flink.optimizer.plan.DualInputPlanNode.<init>(DualInputPlanNode.java:69) at org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81) at org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607) ......<snip>...... The problem seems to caused by YARN's handling of memory, because I have gotten the same Python implementation to work on a smaller, local virtual cluster that is not using YARN, even though my local cluster has far fewer computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR is using. After the YARN job has failed, sometimes a python process is left on the cluster using up most of the RAM. How can I solve this issue? I am unsure of how to reduce the number of operations while keeping the same functionality. Thanks, Geoffrey |
The Python API is in alpha state currently, so we would have to check if it is related specifically to that. Looping in Chesnay who worked on that.
The JVM GC error happens on the client side as that's where the optimizer runs. How much memory does the client submitting the job have? How do you compose the job? Do you have nested loops, e.g. for() { ... bulk iteration Flink program }? – Ufuk On 14 November 2016 at 08:02:26, Geoffrey Mon ([hidden email]) wrote: > Hello all, > > I have a pretty complicated plan file using the Flink Python API running on > a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a > dictionary learning algorithm and has to run a sequence of operations many > times; each sequence involves bulk iterations with join operations and > other more intensive operations, and depends on the results of the previous > sequence. I have found that when the number of times to run this sequence > of operations is high (e.g. 20) I get this exception: > > Uncaught error from thread > [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM > since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink] > java.lang.StackOverflowError > at java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > .............................. > > I assume this problem is caused by having to send too many serialized > operations between Java and Python. When using a Java implementation of the > same operations, I also get: > > java.lang.OutOfMemoryError: GC overhead limit exceeded > at org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106) > at org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99) > at org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90) > at org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69) > at org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81) > at org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607) > ............ > > The problem seems to caused by YARN's handling of memory, because I have > gotten the same Python implementation to work on a smaller, local virtual > cluster that is not using YARN, even though my local cluster has far fewer > computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR is > using. After the YARN job has failed, sometimes a python process is left on > the cluster using up most of the RAM. > > How can I solve this issue? I am unsure of how to reduce the number of > operations while keeping the same functionality. > > Thanks, > Geoffrey > |
Hi Ufuk,
The master instance of the cluster was also a m3.xlarge instance with 15 GB RAM, which I would've expected to be enough. I have gotten the program to run successfully on a personal virtual cluster where each node has 8 GB RAM and where the master node was also a worker node, so the problem appears to have something to do with YARN's memory behavior (such as on EMR). Nevertheless, it would probably be a good idea to modify my code to reduce its memory usage. When running my code on my local cluster, performance was probably bottlenecked. The job does use a for loop to run the core operations for a specific number of times, specified as a command line parameter. If it helps, here is my code: Python: https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py (L260 is the core for loop) Java: https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java (L120 is the core for loop) I would expect the join operations to be a big cause of the excessive memory usage. Thanks! Geoffrey On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi <[hidden email]> wrote: The Python API is in alpha state currently, so we would have to check if it is related specifically to that. Looping in Chesnay who worked on that. |
Hello, I know that the reuse of the data set in my plan is causing the problem (after one dictionary atom is learned using the data set "S", "S" is updated for use with the next dictionary atom). When I comment out the line updating the data set "S", I have no problem and the plan processing phase takes substantially less time. I assume that this is because updating and reusing "S" makes the graph of transformations much more complicated and forces the optimizer to do much more work, since for example the final value of "S" depends on all previous operations combined. Is there a way to replace the for loop in my plan so that I don't cause this complication and so that memory usage is manageable? I considered making "S" an iterative data set, but I need to save each dictionary atom to a file, and I wouldn't be able to do that if "S" was iterative and not finalized. Perhaps I would be able to collect "S" at the end of each dictionary atom and then make the new "S" directly from these values. This however would require that "collect" be implemented in the Python API. In addition, I don't think the problem is YARN-specific anymore because I have been able to reproduce it on a local machine. Cheers, Geoffrey On Mon, Nov 14, 2016 at 11:38 AM Geoffrey Mon <[hidden email]> wrote:
|
Hello,
implementing collect() in python is not that trivial and the gain is questionable. There is an inherent size limit (think 10mb), and it is a bit at odds with the deployment model of the Python API. Something easier would be to execute each iteration of the for-loop as a separate job and save the result in a file. Note that right now the Pyhton API can't execute multiple jobs from the same file; we would need some modifications in the PythonPlanBinder to allow this. Regards, Chesnay On 20.11.2016 23:54, Geoffrey Mon wrote:
|
Hello Chesnay, Thanks for the advice. I've begun adding multiple jobs per Python plan file here: https://issues.apache.org/jira/browse/FLINK-5183 and https://github.com/GEOFBOT/flink/tree/FLINK-5183 The functionality of the patch works. I am able to run multiple jobs per file successfully, but the process doesn't exit once the jobs are done. This is because the main issue that I am encountering is that although I added a check for PythonPlanBinder to check for more jobs from the Python process until the Python process exits, there is a race condition where Python process usually exits after Java checks to see if it is still running. Therefore, unless you use a debugger to pause the Java process until the Python process exits or some other phenomenon happens where Python exits fast enough, the Java process thinks that the Python process is still alive and will end up waiting indefinitely for more Python jobs. Another one minor comment I had with my own patch was that I used global variables to keep track of the number of execution environments and to differentiate between different environments. Is there a better way to do this? Thanks! Cheers, Geoffrey On Wed, Nov 23, 2016 at 5:41 AM, Chesnay Schepler <[hidden email]> wrote:
-- Geoffrey Mon "Are you going to lay there and get killed, or get up and do something about it?" —Unidentified Lieutenant at Easy Red sector, Omaha Beach |
Free forum by Nabble | Edit this page |