Kryo Serialization Issue

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Kryo Serialization Issue

Darshan Singh
Hi,

I am using a map function on a data stream which has 1 column i.e. a json string. Map function simply uses Jackson mapper and convert the String to ObjectNode and also assign key based on one of the value in Object node.

The code seems to work fine for 2-3 minutes as expected and then suddenly it fails with below error. I looked at the mailing list and most of the issues mentioned that it was fixed in 1.5.0 and I am using 1.6.0 so not sure what needs to do.

Just wanted to know if we will need to write our own Serializer for ObjectNode to fix this issue or there is some setting we are missing.

Thanks

ava.lang.IndexOutOfBoundsException: Index: 49, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:207)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)


Reply | Threaded
Open this post in threaded view
|

Re: Kryo Serialization Issue

Rong Rong
This seems to be irrelevant to the issue for KyroSerializer in recent discussions [1]. which has been fixed in 1.4.3, 1.5.0 and 1.6.0.
On a quick glance, this might have been a corrupted message in your decoding, for example a malformed JSON string.

--
Rong


On Wed, Aug 22, 2018 at 8:41 AM Darshan Singh <[hidden email]> wrote:
Hi,

I am using a map function on a data stream which has 1 column i.e. a json string. Map function simply uses Jackson mapper and convert the String to ObjectNode and also assign key based on one of the value in Object node.

The code seems to work fine for 2-3 minutes as expected and then suddenly it fails with below error. I looked at the mailing list and most of the issues mentioned that it was fixed in 1.5.0 and I am using 1.6.0 so not sure what needs to do.

Just wanted to know if we will need to write our own Serializer for ObjectNode to fix this issue or there is some setting we are missing.

Thanks

ava.lang.IndexOutOfBoundsException: Index: 49, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:207)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)


Reply | Threaded
Open this post in threaded view
|

Re: Kryo Serialization Issue

Darshan Singh
Thanks,  We ran into differnet errors and then realized it was OOM issue which was causing different parts to be failed.
Flink was buffering too much data as we were reading too fast from source. Reducing the speed fixed the issue.

However, I am curious how to achieve the same with S3 apart from limiting the number of files to read at same time.

Thanks

On Sun, Aug 26, 2018 at 5:32 PM Rong Rong <[hidden email]> wrote:
This seems to be irrelevant to the issue for KyroSerializer in recent discussions [1]. which has been fixed in 1.4.3, 1.5.0 and 1.6.0.
On a quick glance, this might have been a corrupted message in your decoding, for example a malformed JSON string.

--
Rong


On Wed, Aug 22, 2018 at 8:41 AM Darshan Singh <[hidden email]> wrote:
Hi,

I am using a map function on a data stream which has 1 column i.e. a json string. Map function simply uses Jackson mapper and convert the String to ObjectNode and also assign key based on one of the value in Object node.

The code seems to work fine for 2-3 minutes as expected and then suddenly it fails with below error. I looked at the mailing list and most of the issues mentioned that it was fixed in 1.5.0 and I am using 1.6.0 so not sure what needs to do.

Just wanted to know if we will need to write our own Serializer for ObjectNode to fix this issue or there is some setting we are missing.

Thanks

ava.lang.IndexOutOfBoundsException: Index: 49, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:207)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)


Reply | Threaded
Open this post in threaded view
|

回复:Kryo Serialization Issue

Zhijiang(wangzhijiang999)
Hi,

How do you reduce the speed to avoid this issue? Do you mean reducing the parallelism of source or downstream tasks?
As I know, data buffering is managed by flink internal buffer pool and memory manager, so it will not cause OOM issue.
I just wonder the OOM may be caused by temporary byte buffers in record serializers. If the record size is large and the downstream parallelism is large, it may cause OOM issue in serialization.
Could you show the stack of OOM part? If it is this case,  the following [1] can solve it and it is working in progress.

Zhijiang

------------------------------------------------------------------
发件人:Darshan Singh <[hidden email]>
发送时间:2018年8月28日(星期二) 00:16
收件人:walterddr <[hidden email]>
抄 送:user <[hidden email]>
主 题:Re: Kryo Serialization Issue

Thanks,  We ran into differnet errors and then realized it was OOM issue which was causing different parts to be failed.
Flink was buffering too much data as we were reading too fast from source. Reducing the speed fixed the issue.

However, I am curious how to achieve the same with S3 apart from limiting the number of files to read at same time.

Thanks

On Sun, Aug 26, 2018 at 5:32 PM Rong Rong <[hidden email]> wrote:
This seems to be irrelevant to the issue for KyroSerializer in recent discussions [1]. which has been fixed in 1.4.3, 1.5.0 and 1.6.0.
On a quick glance, this might have been a corrupted message in your decoding, for example a malformed JSON string.

--
Rong


On Wed, Aug 22, 2018 at 8:41 AM Darshan Singh <[hidden email]> wrote:
Hi,

I am using a map function on a data stream which has 1 column i.e. a json string. Map function simply uses Jackson mapper and convert the String to ObjectNode and also assign key based on one of the value in Object node.

The code seems to work fine for 2-3 minutes as expected and then suddenly it fails with below error. I looked at the mailing list and most of the issues mentioned that it was fixed in 1.5.0 and I am using 1.6.0 so not sure what needs to do.

Just wanted to know if we will need to write our own Serializer for ObjectNode to fix this issue or there is some setting we are missing.

Thanks

ava.lang.IndexOutOfBoundsException: Index: 49, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:207)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)