Re: Serialization issues with DataStreamUtils

Posted by Vinay Patil on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Serialization-issues-with-DataStreamUtils-tp15139p15298.html

Hi,

After adding the following two lines the serialization trace does not show
the Schema related classes:

env.getConfig().registerTypeWithKryoSerializer(GenericData.Array.class,
Serializers.SpecificInstanceCollectionSerializerForArrayList.class);

env.getConfig().addDefaultKryoSerializer(Schema.class,Serializers.AvroSchemaSerializer.class);

However I still get exception for :
Serialization trace:
schema (org.apache.avro.generic.GenericData$Record)

The default Kyro serializer is not able to serialize GenericData.Record
class. 

Any other way I can get rid off this exception.

P.S I do not see this exception when I run the actual pipeline, this is only
coming in one of our test case

Regards,
Vinay Patil

On Sat, Aug 26, 2017 at 7:47 PM, vinay patil [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi Robert,

The test case code is as follows:
GenericRecord testData = new GenericData.Record(avroSchema);
SingleOutputStreamOperator<GenericRecord> testStream = env.fromElements(testData)
                                                                                     .map(new DummyOperator(...));

Iterator<GenericRecord>iterator = DataStreamUtils.collect(testStream);

Here is the complete stack trace:

Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 229
Serialization trace:
reserved (org.apache.avro.Schema$NullSchema)
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
        at org.apache.flink.contrib.streaming.SocketStreamIterator.readNextFromStream(SocketStreamIterator.java:149)
        at org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:112)


To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML