Possible Data Corruption?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Possible Data Corruption?

Philip Doctor

Dear Flink Users,

I have a Flink (v1.2.1) process I left running for the last five days.  It aggregates a bit of state and exposes it via Queryable State.  It ran correctly for the first 3 days.  There were no code changes or data changes, but suddenly Queryable State got weird.  The process logs the current value of the queryable state, and from the logs I discerned that the state was correctly being aggregated.  However they Queryable State that was returned was unable to be deserialized.  Rather than the list of longs I expect, instead I get 2 bytes (0x 57 02).  It seemed quite clear that the state in the Task Manager was not the state I was getting out of Queryable State.

 

I next reasoned that my data was being check pointed and possibly I could restore.  So I restarted the process to recover from a check point.  At this point the process fails with the following error

 

java.lang.IllegalStateException: Could not initialize keyed state backend.

    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)

    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)

    at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.IndexOutOfBoundsException: Index: 28, Size: 0

    at java.util.ArrayList.rangeCheck(ArrayList.java:653)

    at java.util.ArrayList.get(ArrayList.java:429)

    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)

    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)

    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)

    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:231)

    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)

    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)

    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788)

    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284)

    ... 6 more

 

 

This looks to me like Flink has serialized out state incorrectly. 

 

I was running Flink 1.2.1, I upgraded to Flink 1.3 after this happened so I could manually set the Kafka partition offset, I backed it up 5 days to replay all the data and now everything is working fine again.

 

However I’m more than a little worried.  Was there a serialization bug fixed in 1.3 ?  I don’t believe there’s anything in my code that could be causing such an issue, but is there something in my jobs that could make something like this happen?  Is this a known bug?  The fact that it not only results in bad data in the query but appears to take down my disaster recovery plan makes me a bit nervous here.

 

Thanks for your time,

Phil

 

Reply | Threaded
Open this post in threaded view
|

Re: Possible Data Corruption?

Ted Yu

On Mon, Jun 19, 2017 at 5:53 PM, Philip Doctor <[hidden email]> wrote:

Dear Flink Users,

I have a Flink (v1.2.1) process I left running for the last five days.  It aggregates a bit of state and exposes it via Queryable State.  It ran correctly for the first 3 days.  There were no code changes or data changes, but suddenly Queryable State got weird.  The process logs the current value of the queryable state, and from the logs I discerned that the state was correctly being aggregated.  However they Queryable State that was returned was unable to be deserialized.  Rather than the list of longs I expect, instead I get 2 bytes (0x 57 02).  It seemed quite clear that the state in the Task Manager was not the state I was getting out of Queryable State.

 

I next reasoned that my data was being check pointed and possibly I could restore.  So I restarted the process to recover from a check point.  At this point the process fails with the following error

 

java.lang.IllegalStateException: Could not initialize keyed state backend.

    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)

    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)

    at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.IndexOutOfBoundsException: Index: 28, Size: 0

    at java.util.ArrayList.rangeCheck(ArrayList.java:653)

    at java.util.ArrayList.get(ArrayList.java:429)

    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)

    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)

    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)

    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:231)

    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)

    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)

    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788)

    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284)

    ... 6 more

 

 

This looks to me like Flink has serialized out state incorrectly. 

 

I was running Flink 1.2.1, I upgraded to Flink 1.3 after this happened so I could manually set the Kafka partition offset, I backed it up 5 days to replay all the data and now everything is working fine again.

 

However I’m more than a little worried.  Was there a serialization bug fixed in 1.3 ?  I don’t believe there’s anything in my code that could be causing such an issue, but is there something in my jobs that could make something like this happen?  Is this a known bug?  The fact that it not only results in bad data in the query but appears to take down my disaster recovery plan makes me a bit nervous here.

 

Thanks for your time,

Phil

 


Reply | Threaded
Open this post in threaded view
|

Re: Possible Data Corruption?

Philip Doctor

Huge thank you!

 

From: Ted Yu <[hidden email]>
Date: Monday, June 19, 2017 at 9:19 PM
To: Philip Doctor <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Possible Data Corruption?

 

See this thread:

<a href="http://search-hadoop.com/m/Flink/VkLeQm2nZm1Wa7Ny1?subj=Re&#43;Painful&#43;KryoException&#43;java&#43;lang&#43;IndexOutOfBoundsException&#43;on&#43;Flink&#43;Batch&#43;Api&#43;scala">http://search-hadoop.com/m/Flink/VkLeQm2nZm1Wa7Ny1?subj=Re+Painful+KryoException+java+lang+IndexOutOfBoundsException+on+Flink+Batch+Api+scala

 

which mentioned FLINK-6398 fixed in 1.2.2 / 1.3

 

On Mon, Jun 19, 2017 at 5:53 PM, Philip Doctor <[hidden email]> wrote:

Dear Flink Users,

I have a Flink (v1.2.1) process I left running for the last five days.  It aggregates a bit of state and exposes it via Queryable State.  It ran correctly for the first 3 days.  There were no code changes or data changes, but suddenly Queryable State got weird.  The process logs the current value of the queryable state, and from the logs I discerned that the state was correctly being aggregated.  However they Queryable State that was returned was unable to be deserialized.  Rather than the list of longs I expect, instead I get 2 bytes (0x 57 02).  It seemed quite clear that the state in the Task Manager was not the state I was getting out of Queryable State.

 

I next reasoned that my data was being check pointed and possibly I could restore.  So I restarted the process to recover from a check point.  At this point the process fails with the following error

 

java.lang.IllegalStateException: Could not initialize keyed state backend.

    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)

    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)

    at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.IndexOutOfBoundsException: Index: 28, Size: 0

    at java.util.ArrayList.rangeCheck(ArrayList.java:653)

    at java.util.ArrayList.get(ArrayList.java:429)

    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)

    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)

    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)

    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:231)

    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)

    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)

    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788)

    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284)

    ... 6 more

 

 

This looks to me like Flink has serialized out state incorrectly. 

 

I was running Flink 1.2.1, I upgraded to Flink 1.3 after this happened so I could manually set the Kafka partition offset, I backed it up 5 days to replay all the data and now everything is working fine again.

 

However I’m more than a little worried.  Was there a serialization bug fixed in 1.3 ?  I don’t believe there’s anything in my code that could be causing such an issue, but is there something in my jobs that could make something like this happen?  Is this a known bug?  The fact that it not only results in bad data in the query but appears to take down my disaster recovery plan makes me a bit nervous here.

 

Thanks for your time,

Phil