Dear Flink Users, I have a Flink (v1.2.1) process I left running for the last five days. It aggregates a bit of state and exposes it via Queryable State. It ran correctly for the first 3 days. There were no code changes
or data changes, but suddenly Queryable State got weird. The process logs the current value of the queryable state, and from the logs I discerned that the state was correctly being aggregated. However they Queryable State that was returned was unable to
be deserialized. Rather than the list of longs I expect, instead I get 2 bytes (0x 57 02). It seemed quite clear that the state in the Task Manager was not the state I was getting out of Queryable State. I next reasoned that my data was being check pointed and possibly I could restore. So I restarted the process to recover from a check point. At this point the process fails with the following error java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IndexOutOfBoundsException: Index: 28, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:231) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284) ... 6 more This looks to me like Flink has serialized out state incorrectly.
I was running Flink 1.2.1, I upgraded to Flink 1.3 after this happened so I could manually set the Kafka partition offset, I backed it up 5 days to replay all the data and now everything is working fine again. However I’m more than a little worried. Was there a serialization bug fixed in 1.3 ? I don’t believe there’s anything in my code that could be causing such an issue, but is there something in my jobs that
could make something like this happen? Is this a known bug? The fact that it not only results in bad data in the query but appears to take down my disaster recovery plan makes me a bit nervous here. Thanks for your time, Phil |
On Mon, Jun 19, 2017 at 5:53 PM, Philip Doctor <[hidden email]> wrote:
|
Huge thank you! From: Ted Yu <[hidden email]> See this thread: <a href="http://search-hadoop.com/m/Flink/VkLeQm2nZm1Wa7Ny1?subj=Re+Painful+KryoException+java+lang+IndexOutOfBoundsException+on+Flink+Batch+Api+scala">http://search-hadoop.com/m/Flink/VkLeQm2nZm1Wa7Ny1?subj=Re+Painful+KryoException+java+lang+IndexOutOfBoundsException+on+Flink+Batch+Api+scala which mentioned FLINK-6398 fixed
in 1.2.2 / 1.3 On Mon, Jun 19, 2017 at 5:53 PM, Philip Doctor <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |