Hi all,
I am trying to write a sink function that retrieves string and creates compressed files in time buckets.
The code is pretty straight forward and based on
CompressWriterFactoryTest
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.compress.CompressWriterFactory;
import org.apache.flink.formats.compress.extractor.DefaultExtractor;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.formats.compress.CompressWriters;
import org.apache.hadoop.conf.Configuration;
CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>())
.withHadoopCompression("Gzip", new Configuration());
StreamingFileSink.forBulkFormat(new Path(getTargetPath()), writer)
.withBucketAssigner(new DateTimeBucketAssigner<>(getDataTimeBucketFormat(getDataTimeBucket()))).build();
When I tried to add it as a sink (dataStream.addSink) the app crashed due to:
org.apache.hadoop.io.compress.GzipCodec@55e3d6c3 is not serializable. The object probably contains or references non serializable fields.
Well, I guess I used something wrong, but I am not sure what ?
Or maybe I should convert the SinkFunction to serializable one, but how can I do it?
Best regards
Eyal Peer
Free forum by Nabble | Edit this page |