Writing to an HDFS file from a Flink stream job

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing to an HDFS file from a Flink stream job

Isuru Suriarachchi
Hi all,

I'm just trying to use an HDFS file as the sink for my flink stream job. I use the following line to do so.

stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo");

I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should work with the full hdfs file name according to [1]. 

However, it doesn't work as expected. File foo is created on hdfs. But that file is empty. But I don't see any error logs too on Flink side. When I used a normal file sink using a "file:///.." url, it works fine and data is there in the file.

Do I need any other configuration to get his working?

Thanks,
Isuru

Reply | Threaded
Open this post in threaded view
|

Re: Writing to an HDFS file from a Flink stream job

Piotr Nowojski
Hi,

Maybe this is an access rights issue? Could you try to create and write to same file (same directory) in some other way (manually?), using the same user and the same machine as would Flink job do?

Maybe there will be some hint in hdfs logs?

Piotrek

On 12 Oct 2017, at 00:19, Isuru Suriarachchi <[hidden email]> wrote:

Hi all,

I'm just trying to use an HDFS file as the sink for my flink stream job. I use the following line to do so.

stream.writeAsText("<a href="hdfs://hadoop-master:9000/user/isuru/foo" class="">hdfs://hadoop-master:9000/user/isuru/foo");

I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should work with the full hdfs file name according to [1]. 

However, it doesn't work as expected. File foo is created on hdfs. But that file is empty. But I don't see any error logs too on Flink side. When I used a normal file sink using a "file:///.." url, it works fine and data is there in the file.

Do I need any other configuration to get his working?

Thanks,
Isuru


Reply | Threaded
Open this post in threaded view
|

Re: Writing to an HDFS file from a Flink stream job

Aljoscha Krettek
Hi Isuru,

What is the source in your job and is the job terminating at some point or running continuously?

In general, the writeAsText()/writeAsCsv() methods should not be used because they don't work well in an infinite streaming job that might have failures and recovery. I.e. what does that mean for the file, if you have recovery. For writing to files you would use the BucketingSink: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html

Best,
Aljoscha

On 12. Oct 2017, at 14:55, Piotr Nowojski <[hidden email]> wrote:

Hi,

Maybe this is an access rights issue? Could you try to create and write to same file (same directory) in some other way (manually?), using the same user and the same machine as would Flink job do?

Maybe there will be some hint in hdfs logs?

Piotrek

On 12 Oct 2017, at 00:19, Isuru Suriarachchi <[hidden email]> wrote:

Hi all,

I'm just trying to use an HDFS file as the sink for my flink stream job. I use the following line to do so.

stream.writeAsText("<a href="hdfs://hadoop-master:9000/user/isuru/foo" class="">hdfs://hadoop-master:9000/user/isuru/foo");

I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should work with the full hdfs file name according to [1]. 

However, it doesn't work as expected. File foo is created on hdfs. But that file is empty. But I don't see any error logs too on Flink side. When I used a normal file sink using a "file:///.." url, it works fine and data is there in the file.

Do I need any other configuration to get his working?

Thanks,
Isuru



Reply | Threaded
Open this post in threaded view
|

Re: Writing to an HDFS file from a Flink stream job

Piotr Nowojski
I think the issue might be that writeAsText (TextOutputFormat) doesn’t flush the data anywhere (only on close, which in streaming doesn’t happen). You would need to use custom output format, but as Aljoscha pointed out BucketingSink makes more sense for streaming applications.

Piotrek

On 12 Oct 2017, at 14:58, Aljoscha Krettek <[hidden email]> wrote:

Hi Isuru,

What is the source in your job and is the job terminating at some point or running continuously?

In general, the writeAsText()/writeAsCsv() methods should not be used because they don't work well in an infinite streaming job that might have failures and recovery. I.e. what does that mean for the file, if you have recovery. For writing to files you would use the BucketingSink: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html

Best,
Aljoscha

On 12. Oct 2017, at 14:55, Piotr Nowojski <[hidden email]> wrote:

Hi,

Maybe this is an access rights issue? Could you try to create and write to same file (same directory) in some other way (manually?), using the same user and the same machine as would Flink job do?

Maybe there will be some hint in hdfs logs?

Piotrek

On 12 Oct 2017, at 00:19, Isuru Suriarachchi <[hidden email]> wrote:

Hi all,

I'm just trying to use an HDFS file as the sink for my flink stream job. I use the following line to do so.

stream.writeAsText("<a href="hdfs://hadoop-master:9000/user/isuru/foo" class="">hdfs://hadoop-master:9000/user/isuru/foo");

I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should work with the full hdfs file name according to [1]. 

However, it doesn't work as expected. File foo is created on hdfs. But that file is empty. But I don't see any error logs too on Flink side. When I used a normal file sink using a "file:///.." url, it works fine and data is there in the file.

Do I need any other configuration to get his working?

Thanks,
Isuru




Reply | Threaded
Open this post in threaded view
|

Re: Writing to an HDFS file from a Flink stream job

Isuru Suriarachchi
Thanks for all your directions. BucketingSink worked.

Isuru

On Thu, Oct 12, 2017 at 9:05 AM, Piotr Nowojski <[hidden email]> wrote:
I think the issue might be that writeAsText (TextOutputFormat) doesn’t flush the data anywhere (only on close, which in streaming doesn’t happen). You would need to use custom output format, but as Aljoscha pointed out BucketingSink makes more sense for streaming applications.

Piotrek

On 12 Oct 2017, at 14:58, Aljoscha Krettek <[hidden email]> wrote:

Hi Isuru,

What is the source in your job and is the job terminating at some point or running continuously?

In general, the writeAsText()/writeAsCsv() methods should not be used because they don't work well in an infinite streaming job that might have failures and recovery. I.e. what does that mean for the file, if you have recovery. For writing to files you would use the BucketingSink: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html

Best,
Aljoscha

On 12. Oct 2017, at 14:55, Piotr Nowojski <[hidden email]> wrote:

Hi,

Maybe this is an access rights issue? Could you try to create and write to same file (same directory) in some other way (manually?), using the same user and the same machine as would Flink job do?

Maybe there will be some hint in hdfs logs?

Piotrek

On 12 Oct 2017, at 00:19, Isuru Suriarachchi <[hidden email]> wrote:

Hi all,

I'm just trying to use an HDFS file as the sink for my flink stream job. I use the following line to do so.


I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should work with the full hdfs file name according to [1]. 

However, it doesn't work as expected. File foo is created on hdfs. But that file is empty. But I don't see any error logs too on Flink side. When I used a normal file sink using a "file:///.." url, it works fine and data is there in the file.

Do I need any other configuration to get his working?

Thanks,
Isuru