Hi all, I am trying to write a sink function that retrieves string and creates compressed files in time buckets. The code is pretty straight forward and based on
CompressWriterFactoryTest import org.apache.flink.core.fs.Path; import org.apache.flink.formats.compress.CompressWriterFactory; import org.apache.flink.formats.compress.extractor.DefaultExtractor; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.formats.compress.CompressWriters; import org.apache.hadoop.conf.Configuration; CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>()) .withHadoopCompression("Gzip", new Configuration()); StreamingFileSink.forBulkFormat(new Path(getTargetPath()), writer) .withBucketAssigner(new DateTimeBucketAssigner<>(getDataTimeBucketFormat(getDataTimeBucket()))).build(); When I tried to add it as a sink (dataStream.addSink) the app crashed due to:
org.apache.hadoop.io.compress.GzipCodec@55e3d6c3 is not serializable. The object probably contains or references non serializable fields. Well, I guess I used something wrong, but I am not sure what ?
Or maybe I should convert the SinkFunction to serializable one, but how can I do it?
Best regards
Eyal Peer |
Hi Eyal,
This is a known issue which is fixed now (see [1]) and will be part of the next releases. Cheers, Kostas [1] https://issues.apache.org/jira/browse/FLINK-16371 On Tue, Mar 24, 2020 at 11:10 AM Eyal Pe'er <[hidden email]> wrote: > > Hi all, > > I am trying to write a sink function that retrieves string and creates compressed files in time buckets. > > The code is pretty straight forward and based on CompressWriterFactoryTest > > > > import org.apache.flink.core.fs.Path; > > import org.apache.flink.formats.compress.CompressWriterFactory; > > import org.apache.flink.formats.compress.extractor.DefaultExtractor; > > import org.apache.flink.streaming.api.functions.sink.SinkFunction; > > import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; > > import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; > > import org.apache.flink.formats.compress.CompressWriters; > > import org.apache.hadoop.conf.Configuration; > > > > CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>()) > > .withHadoopCompression("Gzip", new Configuration()); > > > > StreamingFileSink.forBulkFormat(new Path(getTargetPath()), writer) > > .withBucketAssigner(new DateTimeBucketAssigner<>(getDataTimeBucketFormat(getDataTimeBucket()))).build(); > > > > > > > > When I tried to add it as a sink (dataStream.addSink) the app crashed due to: > > > > org.apache.hadoop.io.compress.GzipCodec@55e3d6c3 is not serializable. The object probably contains or references non serializable fields. > > > > Well, I guess I used something wrong, but I am not sure what ? > > Or maybe I should convert the SinkFunction to serializable one, but how can I do it? > > Best regards > > Eyal Peer > > |
Free forum by Nabble | Edit this page |