Re: Does 'DataStream.writeAsCsv' suppose to work like this?

Posted by Márton Balassi on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Does-DataStream-writeAsCsv-suppose-to-work-like-this-tp3259p3261.html

The problem persists in the current master, simply a format.flush() is needed here [1]. I'll do a quick hotfix, thanks for the report again!

[1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99

On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi <[hidden email]> wrote:
Hey Rex,

Writing half-baked records is definitely unwanted, thanks for spotting this. Most likely it can be solved by adding a flush at the end of every invoke call, let me check.

Best,

Marton

On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <[hidden email]> wrote:
Hi, flinkers!

I'm new to this whole thing,
and it seems to me that ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String, WriteMode, long)' does not work properly.
To be specific, data were not flushed by update frequency when write to HDFS.

what make it more disturbing is that, if I check the content with 'hdfs dfs -cat xxx', sometimes I got partial records.


I did a little digging in flink-0.9.1.
And it turns out all 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)' does
is pushing data to 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.

In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never flushed.
Which result in data being held in local buffer, and 'hdfs dfs -cat xxx' might return partial records.


Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up somewhere?


Best regards and thanks for your time!

Rex