Hi,
We have flink 1.4.2 in production, and we have started seeing below exception consistently. Could some help me understand the real issue happening here? I see that https://issues.apache.org/jira/browse/FLINK-8836 has fixed it, but since it needs an upgrade, we exploring workarounds or other options. Please advice. ---- java.lang.IllegalStateException: Could not initialize operator state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 54, Size 1 Serialization trace: topic (org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:339) at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) ... 6 more ---- Thank you, -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
I am not sure that FLINK-8836 is related to the failure in the stack trace. You say you are using Flink in production, does it mean it always worked and has started to fail recently? From the stack trace, it looks like the arity of some Tuple type changed in some operator state. The number of tuple fields could have increased after job restart. In that case Flink expects tuples with more fields stored in checkpoint and fails. Such change would require an explicit state migration. Could it be the case? When did the failure start to happen and why the operator state was restored? Job restart? Best, Andrey |
Thank You Andrey. Arity of the job has not changed. Here issue is that job
will run for sometime (with checkpoint enabled) and then after some time will get into above exception. The job keeps restarting afterwards. One thing that I want point out here is that we have a custom *serialization schema* attached to *FlinkKafkaConsumer010*. After going through FLINK-8836, I doubt if real issue is with kryo instances being shared across threads? Thanks, -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
The stack trace shows that the state is being restored which has probably already happened after job restart. I am wondering why it has been restarted after some time of running. Could you share full job/task manager logs? On Thu, May 16, 2019 at 6:26 AM anaray <[hidden email]> wrote: Thank You Andrey. Arity of the job has not changed. Here issue is that job |
Free forum by Nabble | Edit this page |