Object has non serializable fields

Posted by Eyal Pe'er on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Object-has-non-serializable-fields-tp33867.html

Hi all,

I am trying to write a sink function that retrieves string and creates compressed files in time buckets.

The code is pretty straight forward and based on CompressWriterFactoryTest

 

import org.apache.flink.core.fs.Path;

import org.apache.flink.formats.compress.CompressWriterFactory;

import org.apache.flink.formats.compress.extractor.DefaultExtractor;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;

import org.apache.flink.formats.compress.CompressWriters;

import org.apache.hadoop.conf.Configuration;

 

CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>())

                .withHadoopCompression("Gzip", new Configuration());

 

StreamingFileSink.forBulkFormat(new Path(getTargetPath()), writer)

                .withBucketAssigner(new DateTimeBucketAssigner<>(getDataTimeBucketFormat(getDataTimeBucket()))).build();

 

 

 

When I tried to add it as a sink (dataStream.addSink) the app crashed due to:

 

org.apache.hadoop.io.compress.GzipCodec@55e3d6c3 is not serializable. The object probably contains or references non serializable fields.

 

Well, I guess I used something wrong, but I am not sure what ?

Or maybe I should convert the SinkFunction to serializable one, but how can I do it?

Best regards

Eyal Peer