Issue with running Flink Python jobs on cluster

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue with running Flink Python jobs on cluster

Geoffrey Mon
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey
Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Chesnay Schepler
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay

On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey

Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Geoffrey Mon
Hello,

Here is the TaskManager log on pastebin:

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <[hidden email]> wrote:
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay


On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey

Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Geoffrey Mon
I've come across similar issues when trying to set up Flink on Amazon EC2 instances. Presumably there is something wrong with my setup? Here is the flink-conf.yaml I am using:

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <[hidden email]> wrote:
Hello,

Here is the TaskManager log on pastebin:

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey


On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <[hidden email]> wrote:
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay


On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey

Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Chesnay Schepler
Could you write a java job that uses the Distributed cache to distribute files?

If this fails then the DC is faulty, if it doesn't something in the Python API is wrong.

On 15.07.2016 08:06, Geoffrey Mon wrote:
I've come across similar issues when trying to set up Flink on Amazon EC2 instances. Presumably there is something wrong with my setup? Here is the flink-conf.yaml I am using:

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <[hidden email]> wrote:
Hello,

Here is the TaskManager log on pastebin:

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey


On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <[hidden email]> wrote:
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay


On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey


Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Geoffrey Mon
I wrote a simple Java plan that reads a file in the distributed cache and uses the first line from that file in a map operation. Sure enough, it works locally, but fails when the job is sent to a taskmanager on a worker node. Since DistributedCache seems to work for everyone else, I'm thinking that maybe some sort of file permissions are not properly set such that Flink is not able to successfully write distributed cache files.

I used inotify-tools to watch the temporary files directory on both the master node and worker node. When the plan is being prepared, the jobmanager node wrote the Python modules and plan file to the temporary files directory. However, on the worker node, the directory tree was created, but the job failed before any of the module or plan files were even attempted to be written. Interestingly enough, there were no error messages or warnings about the cache.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler <[hidden email]> wrote:
Could you write a java job that uses the Distributed cache to distribute files?

If this fails then the DC is faulty, if it doesn't something in the Python API is wrong.


On 15.07.2016 08:06, Geoffrey Mon wrote:
I've come across similar issues when trying to set up Flink on Amazon EC2 instances. Presumably there is something wrong with my setup? Here is the flink-conf.yaml I am using:

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <[hidden email][hidden email]> wrote:
Hello,

Here is the TaskManager log on pastebin:

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey


On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <[hidden email]> wrote:
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay


On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey


Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Geoffrey Mon
The Java program I used to test DistributedCache was faulty since it actually created the cache from files on the machine on which the program was running (i.e. the worker node).

I tried implementing a cluster again, this time using two actual machines instead of virtual machines. I found the same error of the Python libraries and plan file not being found in the temporary directory. Has anyone else been able to successfully set up a Flink cluster to run Python jobs? I've been beginning to suspect that there may be some issues with running Python jobs on Flink clusters that are present in Flink.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <[hidden email]> wrote:
I wrote a simple Java plan that reads a file in the distributed cache and uses the first line from that file in a map operation. Sure enough, it works locally, but fails when the job is sent to a taskmanager on a worker node. Since DistributedCache seems to work for everyone else, I'm thinking that maybe some sort of file permissions are not properly set such that Flink is not able to successfully write distributed cache files.

I used inotify-tools to watch the temporary files directory on both the master node and worker node. When the plan is being prepared, the jobmanager node wrote the Python modules and plan file to the temporary files directory. However, on the worker node, the directory tree was created, but the job failed before any of the module or plan files were even attempted to be written. Interestingly enough, there were no error messages or warnings about the cache.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler <[hidden email]> wrote:
Could you write a java job that uses the Distributed cache to distribute files?

If this fails then the DC is faulty, if it doesn't something in the Python API is wrong.


On 15.07.2016 08:06, Geoffrey Mon wrote:
I've come across similar issues when trying to set up Flink on Amazon EC2 instances. Presumably there is something wrong with my setup? Here is the flink-conf.yaml I am using:

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <[hidden email][hidden email]> wrote:
Hello,

Here is the TaskManager log on pastebin:

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey


On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <[hidden email]> wrote:
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay


On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey



Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Chesnay Schepler
Does this mean the revised DistributedCache job run successfully?

On 17.07.2016 08:43, Geoffrey Mon wrote:
The Java program I used to test DistributedCache was faulty since it actually created the cache from files on the machine on which the program was running (i.e. the worker node).

I tried implementing a cluster again, this time using two actual machines instead of virtual machines. I found the same error of the Python libraries and plan file not being found in the temporary directory. Has anyone else been able to successfully set up a Flink cluster to run Python jobs? I've been beginning to suspect that there may be some issues with running Python jobs on Flink clusters that are present in Flink.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <[hidden email]> wrote:
I wrote a simple Java plan that reads a file in the distributed cache and uses the first line from that file in a map operation. Sure enough, it works locally, but fails when the job is sent to a taskmanager on a worker node. Since DistributedCache seems to work for everyone else, I'm thinking that maybe some sort of file permissions are not properly set such that Flink is not able to successfully write distributed cache files.

I used inotify-tools to watch the temporary files directory on both the master node and worker node. When the plan is being prepared, the jobmanager node wrote the Python modules and plan file to the temporary files directory. However, on the worker node, the directory tree was created, but the job failed before any of the module or plan files were even attempted to be written. Interestingly enough, there were no error messages or warnings about the cache.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler <[hidden email]> wrote:
Could you write a java job that uses the Distributed cache to distribute files?

If this fails then the DC is faulty, if it doesn't something in the Python API is wrong.


On 15.07.2016 08:06, Geoffrey Mon wrote:
I've come across similar issues when trying to set up Flink on Amazon EC2 instances. Presumably there is something wrong with my setup? Here is the flink-conf.yaml I am using:

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <[hidden email]> wrote:
Hello,

Here is the TaskManager log on pastebin:

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey


On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <[hidden email]> wrote:
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay


On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey




Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Chesnay Schepler
In reply to this post by Geoffrey Mon
Please also post the job you're trying to run.

On 17.07.2016 08:43, Geoffrey Mon wrote:
The Java program I used to test DistributedCache was faulty since it actually created the cache from files on the machine on which the program was running (i.e. the worker node).

I tried implementing a cluster again, this time using two actual machines instead of virtual machines. I found the same error of the Python libraries and plan file not being found in the temporary directory. Has anyone else been able to successfully set up a Flink cluster to run Python jobs? I've been beginning to suspect that there may be some issues with running Python jobs on Flink clusters that are present in Flink.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <[hidden email]> wrote:
I wrote a simple Java plan that reads a file in the distributed cache and uses the first line from that file in a map operation. Sure enough, it works locally, but fails when the job is sent to a taskmanager on a worker node. Since DistributedCache seems to work for everyone else, I'm thinking that maybe some sort of file permissions are not properly set such that Flink is not able to successfully write distributed cache files.

I used inotify-tools to watch the temporary files directory on both the master node and worker node. When the plan is being prepared, the jobmanager node wrote the Python modules and plan file to the temporary files directory. However, on the worker node, the directory tree was created, but the job failed before any of the module or plan files were even attempted to be written. Interestingly enough, there were no error messages or warnings about the cache.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler <[hidden email]> wrote:
Could you write a java job that uses the Distributed cache to distribute files?

If this fails then the DC is faulty, if it doesn't something in the Python API is wrong.


On 15.07.2016 08:06, Geoffrey Mon wrote:
I've come across similar issues when trying to set up Flink on Amazon EC2 instances. Presumably there is something wrong with my setup? Here is the flink-conf.yaml I am using:

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <[hidden email]> wrote:
Hello,

Here is the TaskManager log on pastebin:

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey


On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <[hidden email]> wrote:
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay


On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey




Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Geoffrey Mon
I haven't yet figured out how to write a Java job to test DistributedCache functionality between machines; I've only gotten worker nodes to create caches from local files (on the same worker nodes), rather than on files from the master node. The DistributedCache test I've been using (based on the DistributedCacheTest unit test) is here:

I realized that this test only tested local files because I was getting an error that the file used for the cache was not found until I created that file on the worker node in the location specified in the plan.

I've been trying to run a simple Python example that does word counting:

I've tried three different setups so far: I've tried virtual machines, AWS virtual machine instances, and physical machines. With each setup, I get the same errors.

Although with all three of these setups, basic Java jobs can be run (like WordCount, PageRank), Python programs cannot be run because the files needed to run them are not properly distributed to the worker nodes. I've found that although the master node reads the Python libraries and plan files (presumably to send them to the worker), the worker node never writes any of those files to disk, despite the files being added to the list of files in the distributed cache via DistributedCache.writeFileInfotoConfig (which I found via remote debugging).

When a Python program is run via pyflink, it executes but crashes as soon as there is any sort of operation requiring mapping. The following exception is thrown:

2016-07-17 09:39:50,857 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - MapPartition (PythonFlatMap -> PythonMap) (1/1) (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED
2016-07-17 09:39:50,863 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49 EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an exception: An error occurred while copying the file.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: An error occurred while copying the file.
at org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)
<snip>
Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not exist or the user running Flink ('gmon') has insufficient permissions to access it.
at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
at org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
<snip>
... 1 more

If the pyflink library is manually copied into place at /tmp/flink, that error will be replaced by the following:

2016-07-17 00:10:54,342 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - MapPartition (PythonFlatMap -> PythonMap) (1/1) (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED
2016-07-17 00:10:54,348 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at Sun Jul 17 00:10:51 EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
python3: can't open file '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
python3: can't open file '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
<snip>
... 3 more

Both of these exceptions point to Flink not properly copying the requested files. Has anyone else successfully run Python jobs on a Flink cluster, or is there a bug preventing successful operation? Unfortunately, I am relying on using a Flink cluster to run a Python job for some scientific data that needs to be completed soon.

Thank for your assistance,
Geoffrey

On Sun, Jul 17, 2016 at 4:04 AM Chesnay Schepler <[hidden email]> wrote:
Please also post the job you're trying to run.


On 17.07.2016 08:43, Geoffrey Mon wrote:
The Java program I used to test DistributedCache was faulty since it actually created the cache from files on the machine on which the program was running (i.e. the worker node).

I tried implementing a cluster again, this time using two actual machines instead of virtual machines. I found the same error of the Python libraries and plan file not being found in the temporary directory. Has anyone else been able to successfully set up a Flink cluster to run Python jobs? I've been beginning to suspect that there may be some issues with running Python jobs on Flink clusters that are present in Flink.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <[hidden email]> wrote:
I wrote a simple Java plan that reads a file in the distributed cache and uses the first line from that file in a map operation. Sure enough, it works locally, but fails when the job is sent to a taskmanager on a worker node. Since DistributedCache seems to work for everyone else, I'm thinking that maybe some sort of file permissions are not properly set such that Flink is not able to successfully write distributed cache files.

I used inotify-tools to watch the temporary files directory on both the master node and worker node. When the plan is being prepared, the jobmanager node wrote the Python modules and plan file to the temporary files directory. However, on the worker node, the directory tree was created, but the job failed before any of the module or plan files were even attempted to be written. Interestingly enough, there were no error messages or warnings about the cache.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler <[hidden email]> wrote:
Could you write a java job that uses the Distributed cache to distribute files?

If this fails then the DC is faulty, if it doesn't something in the Python API is wrong.


On 15.07.2016 08:06, Geoffrey Mon wrote:
I've come across similar issues when trying to set up Flink on Amazon EC2 instances. Presumably there is something wrong with my setup? Here is the flink-conf.yaml I am using:

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <[hidden email]> wrote:
Hello,

Here is the TaskManager log on pastebin:

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey


On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <[hidden email][hidden email]> wrote:
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay


On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey




Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Chesnay Schepler
well now i know what the problem could be.

You are trying to execute a job on a cluster (== not local), but have set the local flag to true.
    env.execute(local=True)

Due to this flag the files are only copied into the tmp directory of the node where you execute the plan, and are thus not accessible from other worker nodes.

In order to use the Python API on a cluster you must have a filesystem that is accessible by all workers (like HDFS) to which the files can be copied. From there they can be distributed to the nodes via the DC.

On 17.07.2016 17:33, Geoffrey Mon wrote:
I haven't yet figured out how to write a Java job to test DistributedCache functionality between machines; I've only gotten worker nodes to create caches from local files (on the same worker nodes), rather than on files from the master node. The DistributedCache test I've been using (based on the DistributedCacheTest unit test) is here:

I realized that this test only tested local files because I was getting an error that the file used for the cache was not found until I created that file on the worker node in the location specified in the plan.

I've been trying to run a simple Python example that does word counting:

I've tried three different setups so far: I've tried virtual machines, AWS virtual machine instances, and physical machines. With each setup, I get the same errors.

Although with all three of these setups, basic Java jobs can be run (like WordCount, PageRank), Python programs cannot be run because the files needed to run them are not properly distributed to the worker nodes. I've found that although the master node reads the Python libraries and plan files (presumably to send them to the worker), the worker node never writes any of those files to disk, despite the files being added to the list of files in the distributed cache via DistributedCache.writeFileInfotoConfig (which I found via remote debugging).

When a Python program is run via pyflink, it executes but crashes as soon as there is any sort of operation requiring mapping. The following exception is thrown:

2016-07-17 09:39:50,857 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - MapPartition (PythonFlatMap -> PythonMap) (1/1) (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED
2016-07-17 09:39:50,863 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49 EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an exception: An error occurred while copying the file.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: An error occurred while copying the file.
at org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)
<snip>
Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not exist or the user running Flink ('gmon') has insufficient permissions to access it.
at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
at org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
<snip>
... 1 more

If the pyflink library is manually copied into place at /tmp/flink, that error will be replaced by the following:

2016-07-17 00:10:54,342 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - MapPartition (PythonFlatMap -> PythonMap) (1/1) (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED
2016-07-17 00:10:54,348 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at Sun Jul 17 00:10:51 EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
python3: can't open file '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
python3: can't open file '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
<snip>
... 3 more

Both of these exceptions point to Flink not properly copying the requested files. Has anyone else successfully run Python jobs on a Flink cluster, or is there a bug preventing successful operation? Unfortunately, I am relying on using a Flink cluster to run a Python job for some scientific data that needs to be completed soon.

Thank for your assistance,
Geoffrey

On Sun, Jul 17, 2016 at 4:04 AM Chesnay Schepler <[hidden email]> wrote:
Please also post the job you're trying to run.


On 17.07.2016 08:43, Geoffrey Mon wrote:
The Java program I used to test DistributedCache was faulty since it actually created the cache from files on the machine on which the program was running (i.e. the worker node).

I tried implementing a cluster again, this time using two actual machines instead of virtual machines. I found the same error of the Python libraries and plan file not being found in the temporary directory. Has anyone else been able to successfully set up a Flink cluster to run Python jobs? I've been beginning to suspect that there may be some issues with running Python jobs on Flink clusters that are present in Flink.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <[hidden email]> wrote:
I wrote a simple Java plan that reads a file in the distributed cache and uses the first line from that file in a map operation. Sure enough, it works locally, but fails when the job is sent to a taskmanager on a worker node. Since DistributedCache seems to work for everyone else, I'm thinking that maybe some sort of file permissions are not properly set such that Flink is not able to successfully write distributed cache files.

I used inotify-tools to watch the temporary files directory on both the master node and worker node. When the plan is being prepared, the jobmanager node wrote the Python modules and plan file to the temporary files directory. However, on the worker node, the directory tree was created, but the job failed before any of the module or plan files were even attempted to be written. Interestingly enough, there were no error messages or warnings about the cache.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler <[hidden email]> wrote:
Could you write a java job that uses the Distributed cache to distribute files?

If this fails then the DC is faulty, if it doesn't something in the Python API is wrong.


On 15.07.2016 08:06, Geoffrey Mon wrote:
I've come across similar issues when trying to set up Flink on Amazon EC2 instances. Presumably there is something wrong with my setup? Here is the flink-conf.yaml I am using:

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <[hidden email]> wrote:
Hello,

Here is the TaskManager log on pastebin:

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey


On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <[hidden email]> wrote:
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay


On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey





Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Geoffrey Mon
Hello Chesnay,

Thank you very much! With your help I've managed to set up a Flink cluster that can run Python jobs successfully. I solved my issue by removing local=True and installing HDFS in a separate cluster.

I don't think it was clearly mentioned in the documentation that HDFS was required for Python-running clusters. Would it be a good idea to include that in the documentation?

Cheers,
Geoffrey

On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler <[hidden email]> wrote:
well now i know what the problem could be.

You are trying to execute a job on a cluster (== not local), but have set the local flag to true.
    env.execute(local=True)

Due to this flag the files are only copied into the tmp directory of the node where you execute the plan, and are thus not accessible from other worker nodes.

In order to use the Python API on a cluster you must have a filesystem that is accessible by all workers (like HDFS) to which the files can be copied. From there they can be distributed to the nodes via the DC.


On 17.07.2016 17:33, Geoffrey Mon wrote:
I haven't yet figured out how to write a Java job to test DistributedCache functionality between machines; I've only gotten worker nodes to create caches from local files (on the same worker nodes), rather than on files from the master node. The DistributedCache test I've been using (based on the DistributedCacheTest unit test) is here:

I realized that this test only tested local files because I was getting an error that the file used for the cache was not found until I created that file on the worker node in the location specified in the plan.

I've been trying to run a simple Python example that does word counting:

I've tried three different setups so far: I've tried virtual machines, AWS virtual machine instances, and physical machines. With each setup, I get the same errors.

Although with all three of these setups, basic Java jobs can be run (like WordCount, PageRank), Python programs cannot be run because the files needed to run them are not properly distributed to the worker nodes. I've found that although the master node reads the Python libraries and plan files (presumably to send them to the worker), the worker node never writes any of those files to disk, despite the files being added to the list of files in the distributed cache via DistributedCache.writeFileInfotoConfig (which I found via remote debugging).

When a Python program is run via pyflink, it executes but crashes as soon as there is any sort of operation requiring mapping. The following exception is thrown:

2016-07-17 09:39:50,857 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - MapPartition (PythonFlatMap -> PythonMap) (1/1) (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED
2016-07-17 09:39:50,863 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49 EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an exception: An error occurred while copying the file.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: An error occurred while copying the file.
at org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)
<snip>
Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not exist or the user running Flink ('gmon') has insufficient permissions to access it.
at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
at org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
<snip>
... 1 more

If the pyflink library is manually copied into place at /tmp/flink, that error will be replaced by the following:

2016-07-17 00:10:54,342 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - MapPartition (PythonFlatMap -> PythonMap) (1/1) (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED
2016-07-17 00:10:54,348 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at Sun Jul 17 00:10:51 EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
python3: can't open file '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
python3: can't open file '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
<snip>
... 3 more

Both of these exceptions point to Flink not properly copying the requested files. Has anyone else successfully run Python jobs on a Flink cluster, or is there a bug preventing successful operation? Unfortunately, I am relying on using a Flink cluster to run a Python job for some scientific data that needs to be completed soon.

Thank for your assistance,
Geoffrey

On Sun, Jul 17, 2016 at 4:04 AM Chesnay Schepler <[hidden email]> wrote:
Please also post the job you're trying to run.


On 17.07.2016 08:43, Geoffrey Mon wrote:
The Java program I used to test DistributedCache was faulty since it actually created the cache from files on the machine on which the program was running (i.e. the worker node).

I tried implementing a cluster again, this time using two actual machines instead of virtual machines. I found the same error of the Python libraries and plan file not being found in the temporary directory. Has anyone else been able to successfully set up a Flink cluster to run Python jobs? I've been beginning to suspect that there may be some issues with running Python jobs on Flink clusters that are present in Flink.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <[hidden email]> wrote:
I wrote a simple Java plan that reads a file in the distributed cache and uses the first line from that file in a map operation. Sure enough, it works locally, but fails when the job is sent to a taskmanager on a worker node. Since DistributedCache seems to work for everyone else, I'm thinking that maybe some sort of file permissions are not properly set such that Flink is not able to successfully write distributed cache files.

I used inotify-tools to watch the temporary files directory on both the master node and worker node. When the plan is being prepared, the jobmanager node wrote the Python modules and plan file to the temporary files directory. However, on the worker node, the directory tree was created, but the job failed before any of the module or plan files were even attempted to be written. Interestingly enough, there were no error messages or warnings about the cache.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler <[hidden email]> wrote:
Could you write a java job that uses the Distributed cache to distribute files?

If this fails then the DC is faulty, if it doesn't something in the Python API is wrong.


On 15.07.2016 08:06, Geoffrey Mon wrote:
I've come across similar issues when trying to set up Flink on Amazon EC2 instances. Presumably there is something wrong with my setup? Here is the flink-conf.yaml I am using:

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <[hidden email]> wrote:
Hello,

Here is the TaskManager log on pastebin:

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey


On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <[hidden email]> wrote:
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay


On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey





Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Chesnay Schepler
Glad to hear it! The HDFS requirement should most definitely be documented; i assumed it already was actually...

On 19.07.2016 03:42, Geoffrey Mon wrote:
Hello Chesnay,

Thank you very much! With your help I've managed to set up a Flink cluster that can run Python jobs successfully. I solved my issue by removing local=True and installing HDFS in a separate cluster.

I don't think it was clearly mentioned in the documentation that HDFS was required for Python-running clusters. Would it be a good idea to include that in the documentation?

Cheers,
Geoffrey

On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler <[hidden email]> wrote:
well now i know what the problem could be.

You are trying to execute a job on a cluster (== not local), but have set the local flag to true.
    env.execute(local=True)

Due to this flag the files are only copied into the tmp directory of the node where you execute the plan, and are thus not accessible from other worker nodes.

In order to use the Python API on a cluster you must have a filesystem that is accessible by all workers (like HDFS) to which the files can be copied. From there they can be distributed to the nodes via the DC.


On 17.07.2016 17:33, Geoffrey Mon wrote:
I haven't yet figured out how to write a Java job to test DistributedCache functionality between machines; I've only gotten worker nodes to create caches from local files (on the same worker nodes), rather than on files from the master node. The DistributedCache test I've been using (based on the DistributedCacheTest unit test) is here:

I realized that this test only tested local files because I was getting an error that the file used for the cache was not found until I created that file on the worker node in the location specified in the plan.

I've been trying to run a simple Python example that does word counting:

I've tried three different setups so far: I've tried virtual machines, AWS virtual machine instances, and physical machines. With each setup, I get the same errors.

Although with all three of these setups, basic Java jobs can be run (like WordCount, PageRank), Python programs cannot be run because the files needed to run them are not properly distributed to the worker nodes. I've found that although the master node reads the Python libraries and plan files (presumably to send them to the worker), the worker node never writes any of those files to disk, despite the files being added to the list of files in the distributed cache via DistributedCache.writeFileInfotoConfig (which I found via remote debugging).

When a Python program is run via pyflink, it executes but crashes as soon as there is any sort of operation requiring mapping. The following exception is thrown:

2016-07-17 09:39:50,857 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - MapPartition (PythonFlatMap -> PythonMap) (1/1) (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED
2016-07-17 09:39:50,863 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49 EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an exception: An error occurred while copying the file.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: An error occurred while copying the file.
at org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)
<snip>
Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not exist or the user running Flink ('gmon') has insufficient permissions to access it.
at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
at org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
<snip>
... 1 more

If the pyflink library is manually copied into place at /tmp/flink, that error will be replaced by the following:

2016-07-17 00:10:54,342 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - MapPartition (PythonFlatMap -> PythonMap) (1/1) (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED
2016-07-17 00:10:54,348 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at Sun Jul 17 00:10:51 EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
python3: can't open file '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
python3: can't open file '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
<snip>
... 3 more

Both of these exceptions point to Flink not properly copying the requested files. Has anyone else successfully run Python jobs on a Flink cluster, or is there a bug preventing successful operation? Unfortunately, I am relying on using a Flink cluster to run a Python job for some scientific data that needs to be completed soon.

Thank for your assistance,
Geoffrey

On Sun, Jul 17, 2016 at 4:04 AM Chesnay Schepler <[hidden email]> wrote:
Please also post the job you're trying to run.


On 17.07.2016 08:43, Geoffrey Mon wrote:
The Java program I used to test DistributedCache was faulty since it actually created the cache from files on the machine on which the program was running (i.e. the worker node).

I tried implementing a cluster again, this time using two actual machines instead of virtual machines. I found the same error of the Python libraries and plan file not being found in the temporary directory. Has anyone else been able to successfully set up a Flink cluster to run Python jobs? I've been beginning to suspect that there may be some issues with running Python jobs on Flink clusters that are present in Flink.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <[hidden email]> wrote:
I wrote a simple Java plan that reads a file in the distributed cache and uses the first line from that file in a map operation. Sure enough, it works locally, but fails when the job is sent to a taskmanager on a worker node. Since DistributedCache seems to work for everyone else, I'm thinking that maybe some sort of file permissions are not properly set such that Flink is not able to successfully write distributed cache files.

I used inotify-tools to watch the temporary files directory on both the master node and worker node. When the plan is being prepared, the jobmanager node wrote the Python modules and plan file to the temporary files directory. However, on the worker node, the directory tree was created, but the job failed before any of the module or plan files were even attempted to be written. Interestingly enough, there were no error messages or warnings about the cache.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler <[hidden email]> wrote:
Could you write a java job that uses the Distributed cache to distribute files?

If this fails then the DC is faulty, if it doesn't something in the Python API is wrong.


On 15.07.2016 08:06, Geoffrey Mon wrote:
I've come across similar issues when trying to set up Flink on Amazon EC2 instances. Presumably there is something wrong with my setup? Here is the flink-conf.yaml I am using:

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <[hidden email]> wrote:
Hello,

Here is the TaskManager log on pastebin:

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey


On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <[hidden email]> wrote:
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay


On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey






Reply | Threaded
Open this post in threaded view
|

Re: Issue with running Flink Python jobs on cluster

Maximilian Michels
Hi!

HDFS is mentioned in the docs but not explicitly listed as a requirement: https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/python.html#project-setup

I suppose the Python API could also distribute its libraries through Flink's BlobServer.

Cheers,
Max

On Tue, Jul 19, 2016 at 9:24 AM, Chesnay Schepler <[hidden email]> wrote:
Glad to hear it! The HDFS requirement should most definitely be documented; i assumed it already was actually...


On 19.07.2016 03:42, Geoffrey Mon wrote:
Hello Chesnay,

Thank you very much! With your help I've managed to set up a Flink cluster that can run Python jobs successfully. I solved my issue by removing local=True and installing HDFS in a separate cluster.

I don't think it was clearly mentioned in the documentation that HDFS was required for Python-running clusters. Would it be a good idea to include that in the documentation?

Cheers,
Geoffrey

On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler <[hidden email]> wrote:
well now i know what the problem could be.

You are trying to execute a job on a cluster (== not local), but have set the local flag to true.
    env.execute(local=True)

Due to this flag the files are only copied into the tmp directory of the node where you execute the plan, and are thus not accessible from other worker nodes.

In order to use the Python API on a cluster you must have a filesystem that is accessible by all workers (like HDFS) to which the files can be copied. From there they can be distributed to the nodes via the DC.


On 17.07.2016 17:33, Geoffrey Mon wrote:
I haven't yet figured out how to write a Java job to test DistributedCache functionality between machines; I've only gotten worker nodes to create caches from local files (on the same worker nodes), rather than on files from the master node. The DistributedCache test I've been using (based on the DistributedCacheTest unit test) is here:

I realized that this test only tested local files because I was getting an error that the file used for the cache was not found until I created that file on the worker node in the location specified in the plan.

I've been trying to run a simple Python example that does word counting:

I've tried three different setups so far: I've tried virtual machines, AWS virtual machine instances, and physical machines. With each setup, I get the same errors.

Although with all three of these setups, basic Java jobs can be run (like WordCount, PageRank), Python programs cannot be run because the files needed to run them are not properly distributed to the worker nodes. I've found that although the master node reads the Python libraries and plan files (presumably to send them to the worker), the worker node never writes any of those files to disk, despite the files being added to the list of files in the distributed cache via DistributedCache.writeFileInfotoConfig (which I found via remote debugging).

When a Python program is run via pyflink, it executes but crashes as soon as there is any sort of operation requiring mapping. The following exception is thrown:

2016-07-17 09:39:50,857 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - MapPartition (PythonFlatMap -> PythonMap) (1/1) (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED
2016-07-17 09:39:50,863 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49 EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an exception: An error occurred while copying the file.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: An error occurred while copying the file.
at org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)
<snip>
Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not exist or the user running Flink ('gmon') has insufficient permissions to access it.
at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
at org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
<snip>
... 1 more

If the pyflink library is manually copied into place at /tmp/flink, that error will be replaced by the following:

2016-07-17 00:10:54,342 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - MapPartition (PythonFlatMap -> PythonMap) (1/1) (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED
2016-07-17 00:10:54,348 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at Sun Jul 17 00:10:51 EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
python3: can't open file '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
python3: can't open file '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
<snip>
... 3 more

Both of these exceptions point to Flink not properly copying the requested files. Has anyone else successfully run Python jobs on a Flink cluster, or is there a bug preventing successful operation? Unfortunately, I am relying on using a Flink cluster to run a Python job for some scientific data that needs to be completed soon.

Thank for your assistance,
Geoffrey

On Sun, Jul 17, 2016 at 4:04 AM Chesnay Schepler <[hidden email]> wrote:
Please also post the job you're trying to run.


On 17.07.2016 08:43, Geoffrey Mon wrote:
The Java program I used to test DistributedCache was faulty since it actually created the cache from files on the machine on which the program was running (i.e. the worker node).

I tried implementing a cluster again, this time using two actual machines instead of virtual machines. I found the same error of the Python libraries and plan file not being found in the temporary directory. Has anyone else been able to successfully set up a Flink cluster to run Python jobs? I've been beginning to suspect that there may be some issues with running Python jobs on Flink clusters that are present in Flink.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <[hidden email][hidden email]> wrote:
I wrote a simple Java plan that reads a file in the distributed cache and uses the first line from that file in a map operation. Sure enough, it works locally, but fails when the job is sent to a taskmanager on a worker node. Since DistributedCache seems to work for everyone else, I'm thinking that maybe some sort of file permissions are not properly set such that Flink is not able to successfully write distributed cache files.

I used inotify-tools to watch the temporary files directory on both the master node and worker node. When the plan is being prepared, the jobmanager node wrote the Python modules and plan file to the temporary files directory. However, on the worker node, the directory tree was created, but the job failed before any of the module or plan files were even attempted to be written. Interestingly enough, there were no error messages or warnings about the cache.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler <[hidden email][hidden email]> wrote:
Could you write a java job that uses the Distributed cache to distribute files?

If this fails then the DC is faulty, if it doesn't something in the Python API is wrong.


On 15.07.2016 08:06, Geoffrey Mon wrote:
I've come across similar issues when trying to set up Flink on Amazon EC2 instances. Presumably there is something wrong with my setup? Here is the flink-conf.yaml I am using:

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <[hidden email][hidden email]> wrote:
Hello,

Here is the TaskManager log on pastebin:

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey


On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <[hidden email][hidden email]> wrote:
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the Distributed Cache.

Either the file is deleted right after being created for some reason, or the DC returns a file name before the file was created (which shouldn't happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of interest too; could you send them to me?

Regards,
Chesnay


On 13.07.2016 04:11, Geoffrey Mon wrote:
Hello all,

I've set up Flink on a very small cluster of one master node and five worker nodes, following the instructions in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). I can run the included examples like WordCount and PageRank across the entire cluster, but when I try to run simple Python examples, I sometimes get a strange error on the first PythonMapPartition about the temporary folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples run fine. However, if the jobs use the worker nodes, then I get the following error:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
<snip>
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
<snip>
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely.
python: can't open file '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between the master and the workers, but I haven't been able to find any solutions. Presumably the temporary files weren't received properly and thus were not created properly?

Thanks in advance.

Cheers,
Geoffrey