Hi Guys,
I am using DataStreamUtils for unit testing, the test case succeeds when it is run individually but I get the following error when all the tests are run: Serialization trace: fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:114) I tried to to register the above classes but it did not work. Also this error comes randomly for some tests while some test pass. What could be the issue ? Regards, Vinay Patil |
Hi Vinay, could you provide the full stack trace and the Data types you are using in your streaming application, to fully understand the problem? Regards, Robert On Fri, Aug 25, 2017 at 6:51 PM, vinay patil <[hidden email]> wrote: Hi Guys, |
Hi Robert,
The test case code is as follows: GenericRecord testData = new GenericData.Record(avroSchema); SingleOutputStreamOperator<GenericRecord> testStream = env.fromElements(testData) .map(new DummyOperator(...)); Iterator<GenericRecord>iterator = DataStreamUtils.collect(testStream); Here is the complete stack trace: Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 229 Serialization trace: reserved (org.apache.avro.Schema$NullSchema) types (org.apache.avro.Schema$UnionSchema) schema (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.contrib.streaming.SocketStreamIterator.readNextFromStream(SocketStreamIterator.java:149) at org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:112) |
Hi,
After adding the following two lines the serialization trace does not show the Schema related classes: env.getConfig().registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class); env.getConfig().addDefaultKryoSerializer(Schema.class,Serializers.AvroSchemaSerializer.class); However I still get exception for : Serialization trace: schema (org.apache.avro.generic.GenericData$Record) The default Kyro serializer is not able to serialize GenericData.Record class. Any other way I can get rid off this exception. P.S I do not see this exception when I run the actual pipeline, this is only coming in one of our test case -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Vinay Patil
Hi, After adding the following two lines the serialization trace does not show the Schema related classes: env.getConfig().registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class); env.getConfig().addDefaultKryoSerializer(Schema.class,Serializers.AvroSchemaSerializer.class); However I still get exception for : Serialization trace: schema (org.apache.avro.generic.GenericData$Record) The default Kyro serializer is not able to serialize GenericData.Record class. Any other way I can get rid off this exception. P.S I do not see this exception when I run the actual pipeline, this is only coming in one of our test case Regards, Vinay Patil On Sat, Aug 26, 2017 at 7:47 PM, vinay patil [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hi Robert, |
Can anyone please help me with this issue On Aug 31, 2017 5:20 PM, "Vinay Patil" <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |