Writing in flink clusters

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing in flink clusters

Debaditya Roy
Hello users,

I have written and executed a flink program in a cluster. The program was supposed to write to some text file as a sink, however I cannot find the text files in the target directory of the cluster nodes, but when I reexecute the program second time, it gives me the predictable error:

Caused by: java.io.IOException: File or directory already exists. Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.
    at org.apache.flink.core.fs.FileSystem.initOutPathLocalFS(FileSystem.java:595)
    at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:227)
    at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78)
    at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:60)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Is it due to the fact that I am executing the program in a distributed environment, if so how can I write the text files?

Thanks in advance.

Warm Regards,
Debaditya
Reply | Threaded
Open this post in threaded view
|

Re: Writing in flink clusters

Alexis Gendronneau
Hi Roy,

Have you looked on the nodes in charge of sink tasks ? You should be able to find them on flink web interface by clicking on the sink taks. If you get the OVERWRITE error, your output is certainly somewhere. 
By the way, when using distributed mode it is easier to use an output like HDFS. This way you'll find the result on the same path no matter where sink tasks had been ran.

Regards,

Alexis.

2016-07-13 15:20 GMT+02:00 Debaditya Roy <[hidden email]>:
Hello users,

I have written and executed a flink program in a cluster. The program was supposed to write to some text file as a sink, however I cannot find the text files in the target directory of the cluster nodes, but when I reexecute the program second time, it gives me the predictable error:

Caused by: java.io.IOException: File or directory already exists. Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.
    at org.apache.flink.core.fs.FileSystem.initOutPathLocalFS(FileSystem.java:595)
    at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:227)
    at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78)
    at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:60)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Is it due to the fact that I am executing the program in a distributed environment, if so how can I write the text files?

Thanks in advance.

Warm Regards,
Debaditya



--
Reply | Threaded
Open this post in threaded view
|

Re: Writing in flink clusters

Debaditya Roy
Thanks . Will check. :-)

On Wed, Jul 13, 2016 at 3:35 PM, Alexis Gendronneau <[hidden email]> wrote:
Hi Roy,

Have you looked on the nodes in charge of sink tasks ? You should be able to find them on flink web interface by clicking on the sink taks. If you get the OVERWRITE error, your output is certainly somewhere. 
By the way, when using distributed mode it is easier to use an output like HDFS. This way you'll find the result on the same path no matter where sink tasks had been ran.

Regards,

Alexis.

2016-07-13 15:20 GMT+02:00 Debaditya Roy <[hidden email]>:
Hello users,

I have written and executed a flink program in a cluster. The program was supposed to write to some text file as a sink, however I cannot find the text files in the target directory of the cluster nodes, but when I reexecute the program second time, it gives me the predictable error:

Caused by: java.io.IOException: File or directory already exists. Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.
    at org.apache.flink.core.fs.FileSystem.initOutPathLocalFS(FileSystem.java:595)
    at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:227)
    at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78)
    at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:60)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Is it due to the fact that I am executing the program in a distributed environment, if so how can I write the text files?

Thanks in advance.

Warm Regards,
Debaditya



--

Reply | Threaded
Open this post in threaded view
|

Re: Writing in flink clusters

Chesnay Schepler
In reply to this post by Alexis Gendronneau
Hello,

Is that the complete error message? I'm a bit surprised it does not explicitly name any file name. If it really doesn't we should change that.

Regards,
Chesnay Schepler

On 13.07.2016 15:35, Alexis Gendronneau wrote:
Hi Roy,

Have you looked on the nodes in charge of sink tasks ? You should be able to find them on flink web interface by clicking on the sink taks. If you get the OVERWRITE error, your output is certainly somewhere. 
By the way, when using distributed mode it is easier to use an output like HDFS. This way you'll find the result on the same path no matter where sink tasks had been ran.

Regards,

Alexis.

2016-07-13 15:20 GMT+02:00 Debaditya Roy <[hidden email]>:
Hello users,

I have written and executed a flink program in a cluster. The program was supposed to write to some text file as a sink, however I cannot find the text files in the target directory of the cluster nodes, but when I reexecute the program second time, it gives me the predictable error:

Caused by: java.io.IOException: File or directory already exists. Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.
    at org.apache.flink.core.fs.FileSystem.initOutPathLocalFS(FileSystem.java:595)
    at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:227)
    at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78)
    at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:60)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Is it due to the fact that I am executing the program in a distributed environment, if so how can I write the text files?

Thanks in advance.

Warm Regards,
Debaditya



--
Alexis Gendronneau

[hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Writing in flink clusters

rmetzger0
I agree with Chesnay that we should report the file name.
Can you create a [hotfix] PR for that?

On Wed, Jul 13, 2016 at 3:46 PM, Chesnay Schepler <[hidden email]> wrote:
Hello,

Is that the complete error message? I'm a bit surprised it does not explicitly name any file name. If it really doesn't we should change that.

Regards,
Chesnay Schepler


On 13.07.2016 15:35, Alexis Gendronneau wrote:
Hi Roy,

Have you looked on the nodes in charge of sink tasks ? You should be able to find them on flink web interface by clicking on the sink taks. If you get the OVERWRITE error, your output is certainly somewhere. 
By the way, when using distributed mode it is easier to use an output like HDFS. This way you'll find the result on the same path no matter where sink tasks had been ran.

Regards,

Alexis.

2016-07-13 15:20 GMT+02:00 Debaditya Roy <[hidden email]>:
Hello users,

I have written and executed a flink program in a cluster. The program was supposed to write to some text file as a sink, however I cannot find the text files in the target directory of the cluster nodes, but when I reexecute the program second time, it gives me the predictable error:

Caused by: java.io.IOException: File or directory already exists. Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.
    at org.apache.flink.core.fs.FileSystem.initOutPathLocalFS(FileSystem.java:595)
    at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:227)
    at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78)
    at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:60)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Is it due to the fact that I am executing the program in a distributed environment, if so how can I write the text files?

Thanks in advance.

Warm Regards,
Debaditya



--
Alexis Gendronneau

[hidden email]