Limitations in StreamingFileSink BulkFormat

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Limitations in StreamingFileSink BulkFormat

Ayush Verma-2
Hello,

I am using the StreamingFileSink BulkFormat in my Flink stream processing job to output parquet files to S3. Now the StreamingFileSink.BulkFormatBuilder, does not have an option to set a custom rolling policy. It will roll the files whenever the checkpoint triggers. It would be better to have a rolling policy based on both size and time. One option is to write our own StreamingFileSink, which does accept a custom rolling policy, but I suspect there might be some reason for this behaviour.
I would like to get the opinion of Flink experts on this. And if there are any potential workarounds to get the desired behaviour.

Kind regards
Ayush Verma
Reply | Threaded
Open this post in threaded view
|

Re: Limitations in StreamingFileSink BulkFormat

Timothy Victor
Not an expert, but I would think this will not be trivial since the reason for using checkpointing to trigger is to guarantee exactly once semantics in the event of a failure which in turn is tightly integrated into the CP mechanism.  The precursor the StreamingFileSink was BucketingFileSink which I believe did give some control over when to save, but it also suffered from duplicates in the file.   I vaguely recall reading a Flink blogpost on this, but cant recall right now.

I sort of have the same desire, but I worked around it via periodically merging parquet files (doable as long as the schema is the same).  This is out of process of course.

Tim

On Fri, May 31, 2019, 4:36 AM Ayush Verma <[hidden email]> wrote:
Hello,

I am using the StreamingFileSink BulkFormat in my Flink stream processing job to output parquet files to S3. Now the StreamingFileSink.BulkFormatBuilder, does not have an option to set a custom rolling policy. It will roll the files whenever the checkpoint triggers. It would be better to have a rolling policy based on both size and time. One option is to write our own StreamingFileSink, which does accept a custom rolling policy, but I suspect there might be some reason for this behaviour.
I would like to get the opinion of Flink experts on this. And if there are any potential workarounds to get the desired behaviour.

Kind regards
Ayush Verma
Reply | Threaded
Open this post in threaded view
|

Re: Limitations in StreamingFileSink BulkFormat

Woessner, Leo
I have created a process outside of flink for this.  would be nice to use Flink though.

It is important to us that the we only checkpoint after the records are successfully saved in S3.
This is to insure all records are saved during node failure. 
The process I wrote adds records to a file on disk, then when the size or time is passed the file is written to S3.  Only then is the checkpoint written.

Is this semantic possible in Flink?

On Fri, May 31, 2019 at 4:59 AM Timothy Victor <[hidden email]> wrote:
Not an expert, but I would think this will not be trivial since the reason for using checkpointing to trigger is to guarantee exactly once semantics in the event of a failure which in turn is tightly integrated into the CP mechanism.  The precursor the StreamingFileSink was BucketingFileSink which I believe did give some control over when to save, but it also suffered from duplicates in the file.   I vaguely recall reading a Flink blogpost on this, but cant recall right now.

I sort of have the same desire, but I worked around it via periodically merging parquet files (doable as long as the schema is the same).  This is out of process of course.

Tim

On Fri, May 31, 2019, 4:36 AM Ayush Verma <[hidden email]> wrote:
Hello,

I am using the StreamingFileSink BulkFormat in my Flink stream processing job to output parquet files to S3. Now the StreamingFileSink.BulkFormatBuilder, does not have an option to set a custom rolling policy. It will roll the files whenever the checkpoint triggers. It would be better to have a rolling policy based on both size and time. One option is to write our own StreamingFileSink, which does accept a custom rolling policy, but I suspect there might be some reason for this behaviour.
I would like to get the opinion of Flink experts on this. And if there are any potential workarounds to get the desired behaviour.

Kind regards
Ayush Verma


--
Leo Woessner
Domain Engineering
Pearson Education