Hi, flinkers!
I'm new to this whole thing, and it seems to me that ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String, WriteMode, long)' does not work properly. To be specific, data were not flushed by update frequency when write to HDFS. what make it more disturbing is that, if I check the content with 'hdfs dfs -cat xxx', sometimes I got partial records. I did a little digging in flink-0.9.1. And it turns out all 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)' does is pushing data to 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream' which is a delegate of 'org.apache.hadoop.fs.FSDataOutputStream'. In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never flushed. Which result in data being held in local buffer, and 'hdfs dfs -cat xxx' might return partial records. Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up somewhere? Best regards and thanks for your time! Rex |
Hey Rex, Writing half-baked records is definitely unwanted, thanks for spotting this. Most likely it can be solved by adding a flush at the end of every invoke call, let me check.On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <[hidden email]> wrote:
|
The problem persists in the current master, simply a format.flush() is needed here [1]. I'll do a quick hotfix, thanks for the report again! [1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99 On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi <[hidden email]> wrote:
|
Not sure whether we really want to flush at every invoke call. If you want to flush every time, you may want to set the update condition to 0 milliseconds. That way, flush will be called every time. In the API this is exposed by using the FileSinkFunctionByMillis. If you flush every time, performance might degrade. By the way, you may also use the RollingFileSink which splits the output into several files for each hour/week/day. You can then be sure those files are already completely written to HDFS. Best regards, Max On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi <[hidden email]> wrote:
|
Hey Max, The solution I am proposing is not flushing on every record, but it makes sure to forward the flushing from the sinkfunction to the outputformat whenever it is triggered. Practically this means that the buffering is done (almost) solely in the sink and not in the outputformat any more.On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels <[hidden email]> wrote:
|
That makes sense... On Mon, Oct 26, 2015 at 12:31 PM, Márton Balassi <[hidden email]> wrote:
|
Yes, that does make sense! Thank you for explaining. Have you made the
change yet? I couldn't find it on the master. On Wed, Nov 18, 2015 at 5:16 PM, Stephan Ewen <[hidden email]> wrote: > That makes sense... > > On Mon, Oct 26, 2015 at 12:31 PM, Márton Balassi <[hidden email]> > wrote: >> >> Hey Max, >> >> The solution I am proposing is not flushing on every record, but it makes >> sure to forward the flushing from the sinkfunction to the outputformat >> whenever it is triggered. Practically this means that the buffering is done >> (almost) solely in the sink and not in the outputformat any more. >> >> On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels <[hidden email]> >> wrote: >>> >>> Not sure whether we really want to flush at every invoke call. If you >>> want to flush every time, you may want to set the update condition to 0 >>> milliseconds. That way, flush will be called every time. In the API this is >>> exposed by using the FileSinkFunctionByMillis. If you flush every time, >>> performance might degrade. >>> >>> By the way, you may also use the RollingFileSink which splits the output >>> into several files for each hour/week/day. You can then be sure those files >>> are already completely written to HDFS. >>> >>> Best regards, >>> Max >>> >>> On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi >>> <[hidden email]> wrote: >>>> >>>> The problem persists in the current master, simply a format.flush() is >>>> needed here [1]. I'll do a quick hotfix, thanks for the report again! >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99 >>>> >>>> On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi >>>> <[hidden email]> wrote: >>>>> >>>>> Hey Rex, >>>>> >>>>> Writing half-baked records is definitely unwanted, thanks for spotting >>>>> this. Most likely it can be solved by adding a flush at the end of every >>>>> invoke call, let me check. >>>>> >>>>> Best, >>>>> >>>>> Marton >>>>> >>>>> On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <[hidden email]> wrote: >>>>>> >>>>>> Hi, flinkers! >>>>>> >>>>>> I'm new to this whole thing, >>>>>> and it seems to me that ' >>>>>> org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String, >>>>>> WriteMode, long)' does not work properly. >>>>>> To be specific, data were not flushed by update frequency when write >>>>>> to HDFS. >>>>>> >>>>>> what make it more disturbing is that, if I check the content with >>>>>> 'hdfs dfs -cat xxx', sometimes I got partial records. >>>>>> >>>>>> >>>>>> I did a little digging in flink-0.9.1. >>>>>> And it turns out all >>>>>> 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)' >>>>>> does >>>>>> is pushing data to >>>>>> 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream' >>>>>> which is a delegate of 'org.apache.hadoop.fs.FSDataOutputStream'. >>>>>> >>>>>> In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never >>>>>> flushed. >>>>>> Which result in data being held in local buffer, and 'hdfs dfs -cat >>>>>> xxx' might return partial records. >>>>>> >>>>>> >>>>>> Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up >>>>>> somewhere? >>>>>> >>>>>> >>>>>> Best regards and thanks for your time! >>>>>> >>>>>> Rex >>>>> >>>>> >>>> >>> >> > |
Free forum by Nabble | Edit this page |