Object has non serializable fields

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view

Object has non serializable fields

Eyal Pe'er

Hi all,

I am trying to write a sink function that retrieves string and creates compressed files in time buckets.

The code is pretty straight forward and based on CompressWriterFactoryTest


import org.apache.flink.core.fs.Path;

import org.apache.flink.formats.compress.CompressWriterFactory;

import org.apache.flink.formats.compress.extractor.DefaultExtractor;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;

import org.apache.flink.formats.compress.CompressWriters;

import org.apache.hadoop.conf.Configuration;


CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>())

                .withHadoopCompression("Gzip", new Configuration());


StreamingFileSink.forBulkFormat(new Path(getTargetPath()), writer)

                .withBucketAssigner(new DateTimeBucketAssigner<>(getDataTimeBucketFormat(getDataTimeBucket()))).build();




When I tried to add it as a sink (dataStream.addSink) the app crashed due to:


org.apache.hadoop.io.compress.GzipCodec@55e3d6c3 is not serializable. The object probably contains or references non serializable fields.


Well, I guess I used something wrong, but I am not sure what ?

Or maybe I should convert the SinkFunction to serializable one, but how can I do it?

Best regards

Eyal Peer


Reply | Threaded
Open this post in threaded view

Re: Object has non serializable fields

Kostas Kloudas-2
Hi Eyal,

This is a known issue which is fixed now (see [1]) and will be part of
the next releases.


[1] https://issues.apache.org/jira/browse/FLINK-16371

On Tue, Mar 24, 2020 at 11:10 AM Eyal Pe'er <[hidden email]> wrote:

> Hi all,
> I am trying to write a sink function that retrieves string and creates compressed files in time buckets.
> The code is pretty straight forward and based on CompressWriterFactoryTest
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.formats.compress.CompressWriterFactory;
> import org.apache.flink.formats.compress.extractor.DefaultExtractor;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
> import org.apache.flink.formats.compress.CompressWriters;
> import org.apache.hadoop.conf.Configuration;
> CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>())
>                 .withHadoopCompression("Gzip", new Configuration());
> StreamingFileSink.forBulkFormat(new Path(getTargetPath()), writer)
>                 .withBucketAssigner(new DateTimeBucketAssigner<>(getDataTimeBucketFormat(getDataTimeBucket()))).build();
> When I tried to add it as a sink (dataStream.addSink) the app crashed due to:
> org.apache.hadoop.io.compress.GzipCodec@55e3d6c3 is not serializable. The object probably contains or references non serializable fields.
> Well, I guess I used something wrong, but I am not sure what ?
> Or maybe I should convert the SinkFunction to serializable one, but how can I do it?
> Best regards
> Eyal Peer