StreamingFileSink Avro batch size and compression

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

StreamingFileSink Avro batch size and compression

Taher Koitawala
Hi All,
         Is there a way to specify batch size and compression properties when using StreamingFileSink just like we did in bucketing sink? The only parameters it is accepting is Inactivity bucket check interval and avro schema.

          We have numerous flink jobs pulling data from the same kafka topics, however doing different operations. And each flink job is writing a file with different size and we would want to make it consistent.


Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Avro batch size and compression

Taher Koitawala
Can someone please help with this?

On Fri 25 Jan, 2019, 1:47 PM Taher Koitawala <[hidden email] wrote:
Hi All,
         Is there a way to specify batch size and compression properties when using StreamingFileSink just like we did in bucketing sink? The only parameters it is accepting is Inactivity bucket check interval and avro schema.

          We have numerous flink jobs pulling data from the same kafka topics, however doing different operations. And each flink job is writing a file with different size and we would want to make it consistent.


Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Avro batch size and compression

Dawid Wysakowicz-2

Hi,

Generally speaking you can pass the batch size through RollingPolicy[1]. Unfortunately BulkFormats uses OnCheckpointRollingPolicy and AFAIK it does not allow adjusting its behavior on part size. Maybe Kostas have an idea how to do that in the least invasive way. How to do it for non bulk formats you can have a look at[2].

I assume that you were using AvroParquetWriter. You can specify compression on the ParquetWriter I guess the same way as before. The code for doing it can look sth like this:

StreamingFileSink.forBulkFormat(
    Path.fromLocalFile(folder),
    new ParquetWriterFactory<>(out ->
        AvroParquetWriter.<Datum>builder(out)
            .withSchema(...)
            .withDataMode(...)
            .withCompressionCodec(...)
            .build()))
    .build()

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html

[2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java

On 26/01/2019 08:18, Taher Koitawala wrote:
Can someone please help with this?

On Fri 25 Jan, 2019, 1:47 PM Taher Koitawala <[hidden email] wrote:
Hi All,
         Is there a way to specify batch size and compression properties when using StreamingFileSink just like we did in bucketing sink? The only parameters it is accepting is Inactivity bucket check interval and avro schema.

          We have numerous flink jobs pulling data from the same kafka topics, however doing different operations. And each flink job is writing a file with different size and we would want to make it consistent.


Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Avro batch size and compression

Taher Koitawala
Thanks, I'll check it out. 

On Sat 26 Jan, 2019, 10:48 PM Dawid Wysakowicz <[hidden email] wrote:

Hi,

Generally speaking you can pass the batch size through RollingPolicy[1]. Unfortunately BulkFormats uses OnCheckpointRollingPolicy and AFAIK it does not allow adjusting its behavior on part size. Maybe Kostas have an idea how to do that in the least invasive way. How to do it for non bulk formats you can have a look at[2].

I assume that you were using AvroParquetWriter. You can specify compression on the ParquetWriter I guess the same way as before. The code for doing it can look sth like this:

StreamingFileSink.forBulkFormat(
    Path.fromLocalFile(folder),
    new ParquetWriterFactory<>(out ->
        AvroParquetWriter.<Datum>builder(out)
            .withSchema(...)
            .withDataMode(...)
            .withCompressionCodec(...)
            .build()))
    .build()

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html

[2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java

On 26/01/2019 08:18, Taher Koitawala wrote:
Can someone please help with this?

On Fri 25 Jan, 2019, 1:47 PM Taher Koitawala <[hidden email] wrote:
Hi All,
         Is there a way to specify batch size and compression properties when using StreamingFileSink just like we did in bucketing sink? The only parameters it is accepting is Inactivity bucket check interval and avro schema.

          We have numerous flink jobs pulling data from the same kafka topics, however doing different operations. And each flink job is writing a file with different size and we would want to make it consistent.


Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163