Serialization issues with DataStreamUtils

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Serialization issues with DataStreamUtils

Vinay Patil
Hi Guys,

I am using DataStreamUtils for unit testing, the test case succeeds when it is run individually but I get the following error when all the tests are run:

Serialization trace:
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:114)

I tried to to register the above classes but it did not work. Also this error comes randomly for some tests while some test pass.

What could be the issue ?

Regards,
Vinay Patil
Reply | Threaded
Open this post in threaded view
|

Re: Serialization issues with DataStreamUtils

rmetzger0
Hi Vinay,

could you provide the full stack trace and the Data types you are using in your streaming application, to fully understand the problem?

Regards,
Robert


On Fri, Aug 25, 2017 at 6:51 PM, vinay patil <[hidden email]> wrote:
Hi Guys,

I am using DataStreamUtils for unit testing, the test case succeeds when it
is run individually but I get the following error when all the tests are
run:

Serialization trace:
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at
org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:114)

I tried to to register the above classes but it did not work. Also this
error comes randomly for some tests while some test pass.

What could be the issue ?

Regards,
Vinay Patil




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-issues-with-DataStreamUtils-tp15139.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Serialization issues with DataStreamUtils

Vinay Patil
Hi Robert,

The test case code is as follows:
GenericRecord testData = new GenericData.Record(avroSchema);
SingleOutputStreamOperator<GenericRecord> testStream = env.fromElements(testData)
                                                                                     .map(new DummyOperator(...));

Iterator<GenericRecord>iterator = DataStreamUtils.collect(testStream);

Here is the complete stack trace:

Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 229
Serialization trace:
reserved (org.apache.avro.Schema$NullSchema)
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
        at org.apache.flink.contrib.streaming.SocketStreamIterator.readNextFromStream(SocketStreamIterator.java:149)
        at org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:112)
Reply | Threaded
Open this post in threaded view
|

Re: Serialization issues with DataStreamUtils

Vinay Patil
Hi,

After adding the following two lines the serialization trace does not show
the Schema related classes:

env.getConfig().registerTypeWithKryoSerializer(GenericData.Array.class,
Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
       
env.getConfig().addDefaultKryoSerializer(Schema.class,Serializers.AvroSchemaSerializer.class);

However I still get exception for :
Serialization trace:
schema (org.apache.avro.generic.GenericData$Record)

The default Kyro serializer is not able to serialize GenericData.Record
class.

Any other way I can get rid off this exception.

P.S I do not see this exception when I run the actual pipeline, this is only
coming in one of our test case



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Serialization issues with DataStreamUtils

Vinay Patil
In reply to this post by Vinay Patil
Hi,

After adding the following two lines the serialization trace does not show
the Schema related classes:

env.getConfig().registerTypeWithKryoSerializer(GenericData.Array.class,
Serializers.SpecificInstanceCollectionSerializerForArrayList.class);

env.getConfig().addDefaultKryoSerializer(Schema.class,Serializers.AvroSchemaSerializer.class);

However I still get exception for :
Serialization trace:
schema (org.apache.avro.generic.GenericData$Record)

The default Kyro serializer is not able to serialize GenericData.Record
class. 

Any other way I can get rid off this exception.

P.S I do not see this exception when I run the actual pipeline, this is only
coming in one of our test case

Regards,
Vinay Patil

On Sat, Aug 26, 2017 at 7:47 PM, vinay patil [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi Robert,

The test case code is as follows:
GenericRecord testData = new GenericData.Record(avroSchema);
SingleOutputStreamOperator<GenericRecord> testStream = env.fromElements(testData)
                                                                                     .map(new DummyOperator(...));

Iterator<GenericRecord>iterator = DataStreamUtils.collect(testStream);

Here is the complete stack trace:

Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 229
Serialization trace:
reserved (org.apache.avro.Schema$NullSchema)
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
        at org.apache.flink.contrib.streaming.SocketStreamIterator.readNextFromStream(SocketStreamIterator.java:149)
        at org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:112)


To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Serialization issues with DataStreamUtils

Vinay Patil
Can anyone please help me with this issue

On Aug 31, 2017 5:20 PM, "Vinay Patil" <[hidden email]> wrote:
Hi,

After adding the following two lines the serialization trace does not show
the Schema related classes:

env.getConfig().registerTypeWithKryoSerializer(GenericData.Array.class,
Serializers.SpecificInstanceCollectionSerializerForArrayList.class);

env.getConfig().addDefaultKryoSerializer(Schema.class,Serializers.AvroSchemaSerializer.class);

However I still get exception for :
Serialization trace:
schema (org.apache.avro.generic.GenericData$Record)

The default Kyro serializer is not able to serialize GenericData.Record
class. 

Any other way I can get rid off this exception.

P.S I do not see this exception when I run the actual pipeline, this is only
coming in one of our test case

Regards,
Vinay Patil

On Sat, Aug 26, 2017 at 7:47 PM, vinay patil [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi Robert,

The test case code is as follows:
GenericRecord testData = new GenericData.Record(avroSchema);
SingleOutputStreamOperator<GenericRecord> testStream = env.fromElements(testData)
                                                                                     .map(new DummyOperator(...));

Iterator<GenericRecord>iterator = DataStreamUtils.collect(testStream);

Here is the complete stack trace:

Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 229
Serialization trace:
reserved (org.apache.avro.Schema$NullSchema)
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
        at org.apache.flink.contrib.streaming.SocketStreamIterator.readNextFromStream(SocketStreamIterator.java:149)
        at org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:112)


To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML