Does 'DataStream.writeAsCsv' suppose to work like this?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Does 'DataStream.writeAsCsv' suppose to work like this?

Rex Ge
Hi, flinkers!

I'm new to this whole thing,
and it seems to me that ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String, WriteMode, long)' does not work properly.
To be specific, data were not flushed by update frequency when write to HDFS.

what make it more disturbing is that, if I check the content with 'hdfs dfs -cat xxx', sometimes I got partial records.


I did a little digging in flink-0.9.1.
And it turns out all 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)' does
is pushing data to 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.

In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never flushed.
Which result in data being held in local buffer, and 'hdfs dfs -cat xxx' might return partial records.


Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up somewhere?


Best regards and thanks for your time!

Rex
Reply | Threaded
Open this post in threaded view
|

Re: Does 'DataStream.writeAsCsv' suppose to work like this?

Márton Balassi
Hey Rex,

Writing half-baked records is definitely unwanted, thanks for spotting this. Most likely it can be solved by adding a flush at the end of every invoke call, let me check.

Best,

Marton

On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <[hidden email]> wrote:
Hi, flinkers!

I'm new to this whole thing,
and it seems to me that ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String, WriteMode, long)' does not work properly.
To be specific, data were not flushed by update frequency when write to HDFS.

what make it more disturbing is that, if I check the content with 'hdfs dfs -cat xxx', sometimes I got partial records.


I did a little digging in flink-0.9.1.
And it turns out all 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)' does
is pushing data to 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.

In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never flushed.
Which result in data being held in local buffer, and 'hdfs dfs -cat xxx' might return partial records.


Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up somewhere?


Best regards and thanks for your time!

Rex

Reply | Threaded
Open this post in threaded view
|

Re: Does 'DataStream.writeAsCsv' suppose to work like this?

Márton Balassi
The problem persists in the current master, simply a format.flush() is needed here [1]. I'll do a quick hotfix, thanks for the report again!

[1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99

On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi <[hidden email]> wrote:
Hey Rex,

Writing half-baked records is definitely unwanted, thanks for spotting this. Most likely it can be solved by adding a flush at the end of every invoke call, let me check.

Best,

Marton

On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <[hidden email]> wrote:
Hi, flinkers!

I'm new to this whole thing,
and it seems to me that ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String, WriteMode, long)' does not work properly.
To be specific, data were not flushed by update frequency when write to HDFS.

what make it more disturbing is that, if I check the content with 'hdfs dfs -cat xxx', sometimes I got partial records.


I did a little digging in flink-0.9.1.
And it turns out all 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)' does
is pushing data to 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.

In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never flushed.
Which result in data being held in local buffer, and 'hdfs dfs -cat xxx' might return partial records.


Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up somewhere?


Best regards and thanks for your time!

Rex


Reply | Threaded
Open this post in threaded view
|

Re: Does 'DataStream.writeAsCsv' suppose to work like this?

Maximilian Michels
Not sure whether we really want to flush at every invoke call. If you want to flush every time, you may want to set the update condition to 0 milliseconds. That way, flush will be called every time. In the API this is exposed by using the FileSinkFunctionByMillis. If you flush every time, performance might degrade.

By the way, you may also use the RollingFileSink which splits the output into several files for each hour/week/day. You can then be sure those files are already completely written to HDFS.

Best regards,
Max

On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi <[hidden email]> wrote:
The problem persists in the current master, simply a format.flush() is needed here [1]. I'll do a quick hotfix, thanks for the report again!

[1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99

On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi <[hidden email]> wrote:
Hey Rex,

Writing half-baked records is definitely unwanted, thanks for spotting this. Most likely it can be solved by adding a flush at the end of every invoke call, let me check.

Best,

Marton

On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <[hidden email]> wrote:
Hi, flinkers!

I'm new to this whole thing,
and it seems to me that ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String, WriteMode, long)' does not work properly.
To be specific, data were not flushed by update frequency when write to HDFS.

what make it more disturbing is that, if I check the content with 'hdfs dfs -cat xxx', sometimes I got partial records.


I did a little digging in flink-0.9.1.
And it turns out all 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)' does
is pushing data to 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.

In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never flushed.
Which result in data being held in local buffer, and 'hdfs dfs -cat xxx' might return partial records.


Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up somewhere?


Best regards and thanks for your time!

Rex



Reply | Threaded
Open this post in threaded view
|

Re: Does 'DataStream.writeAsCsv' suppose to work like this?

Márton Balassi
Hey Max,

The solution I am proposing is not flushing on every record, but it makes sure to forward the flushing from the sinkfunction to the outputformat whenever it is triggered. Practically this means that the buffering is done (almost) solely in the sink and not in the outputformat any more.

On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels <[hidden email]> wrote:
Not sure whether we really want to flush at every invoke call. If you want to flush every time, you may want to set the update condition to 0 milliseconds. That way, flush will be called every time. In the API this is exposed by using the FileSinkFunctionByMillis. If you flush every time, performance might degrade.

By the way, you may also use the RollingFileSink which splits the output into several files for each hour/week/day. You can then be sure those files are already completely written to HDFS.

Best regards,
Max

On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi <[hidden email]> wrote:
The problem persists in the current master, simply a format.flush() is needed here [1]. I'll do a quick hotfix, thanks for the report again!

[1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99

On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi <[hidden email]> wrote:
Hey Rex,

Writing half-baked records is definitely unwanted, thanks for spotting this. Most likely it can be solved by adding a flush at the end of every invoke call, let me check.

Best,

Marton

On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <[hidden email]> wrote:
Hi, flinkers!

I'm new to this whole thing,
and it seems to me that ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String, WriteMode, long)' does not work properly.
To be specific, data were not flushed by update frequency when write to HDFS.

what make it more disturbing is that, if I check the content with 'hdfs dfs -cat xxx', sometimes I got partial records.


I did a little digging in flink-0.9.1.
And it turns out all 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)' does
is pushing data to 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.

In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never flushed.
Which result in data being held in local buffer, and 'hdfs dfs -cat xxx' might return partial records.


Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up somewhere?


Best regards and thanks for your time!

Rex




Reply | Threaded
Open this post in threaded view
|

Re: Does 'DataStream.writeAsCsv' suppose to work like this?

Stephan Ewen
That makes sense...

On Mon, Oct 26, 2015 at 12:31 PM, Márton Balassi <[hidden email]> wrote:
Hey Max,

The solution I am proposing is not flushing on every record, but it makes sure to forward the flushing from the sinkfunction to the outputformat whenever it is triggered. Practically this means that the buffering is done (almost) solely in the sink and not in the outputformat any more.

On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels <[hidden email]> wrote:
Not sure whether we really want to flush at every invoke call. If you want to flush every time, you may want to set the update condition to 0 milliseconds. That way, flush will be called every time. In the API this is exposed by using the FileSinkFunctionByMillis. If you flush every time, performance might degrade.

By the way, you may also use the RollingFileSink which splits the output into several files for each hour/week/day. You can then be sure those files are already completely written to HDFS.

Best regards,
Max

On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi <[hidden email]> wrote:
The problem persists in the current master, simply a format.flush() is needed here [1]. I'll do a quick hotfix, thanks for the report again!

[1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99

On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi <[hidden email]> wrote:
Hey Rex,

Writing half-baked records is definitely unwanted, thanks for spotting this. Most likely it can be solved by adding a flush at the end of every invoke call, let me check.

Best,

Marton

On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <[hidden email]> wrote:
Hi, flinkers!

I'm new to this whole thing,
and it seems to me that ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String, WriteMode, long)' does not work properly.
To be specific, data were not flushed by update frequency when write to HDFS.

what make it more disturbing is that, if I check the content with 'hdfs dfs -cat xxx', sometimes I got partial records.


I did a little digging in flink-0.9.1.
And it turns out all 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)' does
is pushing data to 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.

In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never flushed.
Which result in data being held in local buffer, and 'hdfs dfs -cat xxx' might return partial records.


Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up somewhere?


Best regards and thanks for your time!

Rex





Reply | Threaded
Open this post in threaded view
|

Re: Does 'DataStream.writeAsCsv' suppose to work like this?

Maximilian Michels
Yes, that does make sense! Thank you for explaining. Have you made the
change yet? I couldn't find it on the master.

On Wed, Nov 18, 2015 at 5:16 PM, Stephan Ewen <[hidden email]> wrote:

> That makes sense...
>
> On Mon, Oct 26, 2015 at 12:31 PM, Márton Balassi <[hidden email]>
> wrote:
>>
>> Hey Max,
>>
>> The solution I am proposing is not flushing on every record, but it makes
>> sure to forward the flushing from the sinkfunction to the outputformat
>> whenever it is triggered. Practically this means that the buffering is done
>> (almost) solely in the sink and not in the outputformat any more.
>>
>> On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels <[hidden email]>
>> wrote:
>>>
>>> Not sure whether we really want to flush at every invoke call. If you
>>> want to flush every time, you may want to set the update condition to 0
>>> milliseconds. That way, flush will be called every time. In the API this is
>>> exposed by using the FileSinkFunctionByMillis. If you flush every time,
>>> performance might degrade.
>>>
>>> By the way, you may also use the RollingFileSink which splits the output
>>> into several files for each hour/week/day. You can then be sure those files
>>> are already completely written to HDFS.
>>>
>>> Best regards,
>>> Max
>>>
>>> On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi
>>> <[hidden email]> wrote:
>>>>
>>>> The problem persists in the current master, simply a format.flush() is
>>>> needed here [1]. I'll do a quick hotfix, thanks for the report again!
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99
>>>>
>>>> On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi
>>>> <[hidden email]> wrote:
>>>>>
>>>>> Hey Rex,
>>>>>
>>>>> Writing half-baked records is definitely unwanted, thanks for spotting
>>>>> this. Most likely it can be solved by adding a flush at the end of every
>>>>> invoke call, let me check.
>>>>>
>>>>> Best,
>>>>>
>>>>> Marton
>>>>>
>>>>> On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <[hidden email]> wrote:
>>>>>>
>>>>>> Hi, flinkers!
>>>>>>
>>>>>> I'm new to this whole thing,
>>>>>> and it seems to me that '
>>>>>> org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String,
>>>>>> WriteMode, long)' does not work properly.
>>>>>> To be specific, data were not flushed by update frequency when write
>>>>>> to HDFS.
>>>>>>
>>>>>> what make it more disturbing is that, if I check the content with
>>>>>> 'hdfs dfs -cat xxx', sometimes I got partial records.
>>>>>>
>>>>>>
>>>>>> I did a little digging in flink-0.9.1.
>>>>>> And it turns out all
>>>>>> 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)'
>>>>>> does
>>>>>> is pushing data to
>>>>>> 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
>>>>>> which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.
>>>>>>
>>>>>> In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never
>>>>>> flushed.
>>>>>> Which result in data being held in local buffer, and 'hdfs dfs -cat
>>>>>> xxx' might return partial records.
>>>>>>
>>>>>>
>>>>>> Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up
>>>>>> somewhere?
>>>>>>
>>>>>>
>>>>>> Best regards and thanks for your time!
>>>>>>
>>>>>> Rex
>>>>>
>>>>>
>>>>
>>>
>>
>