Flink S3 sink unable to compress data

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink S3 sink unable to compress data

amran dean
Hello,
I am writing a custom S3 object encoder (code here: https://pastebin.com/raw/9Ag4ZVpX) used via:

StreamingFileSink<KafkaRecord> dataSink = StreamingFileSink
.forRowFormat(new Path("s3a://some_path"), new RecordDataSerializer())

During execution, it does not produce any data in S3. This pipeline works if the data is not compressed beforehand, and no exceptions are thrown in Flink logs or in the webUI. I am confused what is happening and would appreciate any help.
Reply | Threaded
Open this post in threaded view
|

Re: Flink S3 sink unable to compress data

Ravi Bhushan Ratnakar
Hi,

As per my understanding, Encoder's encode method is called for each and every message and hence it is not logical to create compressor around given output stream which will lead into unpredictable erroneous situation. Encode responsibility is to encode the given object, not to compress. It seems like at the moment RowFormat does not support compression. 


If you want to write compressed output,  you could have your own implementation for BulkFormat

Regards, 
Ravi 


On Fri 18 Oct, 2019, 20:30 amran dean, <[hidden email]> wrote:
Hello,
I am writing a custom S3 object encoder (code here: https://pastebin.com/raw/9Ag4ZVpX) used via:

StreamingFileSink<KafkaRecord> dataSink = StreamingFileSink
.forRowFormat(new Path("s3a://some_path"), new RecordDataSerializer())

During execution, it does not produce any data in S3. This pipeline works if the data is not compressed beforehand, and no exceptions are thrown in Flink logs or in the webUI. I am confused what is happening and would appreciate any help.