Kryo Deserializer

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Kryo Deserializer

bdas77
Hello,

Having an issue with nested protobuf deserialization, event tried with register the class with Kryo like beloe but seems like no help ,  one of the options left for me is to write a custom serializer or convert the byte array to a Dictionary object .

val clazz = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig.registerTypeWithKryoSerializer(clazz,classOf[UnmodifiableCollectionsSerializer])



-----------
.ClickSchema$Click)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:112)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:42)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:116)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:156)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSer
------------------------

Thank you
~/Das
Reply | Threaded
Open this post in threaded view
|

Re: Kryo Deserializer

小多
Hi Biswajit,




Best regards,
Duo

On Sat, Jan 21, 2017 at 9:15 AM, Biswajit Das <[hidden email]> wrote:
Hello,

Having an issue with nested protobuf deserialization, event tried with register the class with Kryo like beloe but seems like no help ,  one of the options left for me is to write a custom serializer or convert the byte array to a Dictionary object .

val clazz = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig.registerTypeWithKryoSerializer(clazz,classOf[UnmodifiableCollectionsSerializer])



-----------
.ClickSchema$Click)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:112)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:42)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:116)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:156)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSer
------------------------



--

Programmer, Geek...