flink 1.4.2. java.lang.IllegalStateException: Could not initialize operator state backend

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.4.2. java.lang.IllegalStateException: Could not initialize operator state backend

anaray
Hi,
We have flink 1.4.2 in production, and we have started seeing below
exception consistently. Could some help me understand the real issue
happening here? I see that https://issues.apache.org/jira/browse/FLINK-8836
has fixed it, but since it needs an upgrade, we exploring workarounds or
other options.
Please advice.
 
----
java.lang.IllegalStateException: Could not initialize operator state
backend.
         at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
         at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
         at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
         at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
         at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
         at java.lang.Thread.run(Thread.java:748)
 Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 54, Size 1
 Serialization trace:
 topic
(org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition)
       at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
       at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
       at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
       at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
       at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
       at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
       at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:339)
       at
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
       at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
       ... 6 more
       
----


Thank you,        



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.4.2. java.lang.IllegalStateException: Could not initialize operator state backend

Andrey Zagrebin-3
Hi,

I am not sure that FLINK-8836 is related to the failure in the stack trace.

You say you are using Flink in production, does it mean it always worked and has started to fail recently?

From the stack trace, it looks like the arity of some Tuple type changed in some operator state. The number of tuple fields could have increased after job restart. In that case Flink expects tuples with more fields stored in checkpoint and fails. Such change would require an explicit state migration. Could it be the case? When did the failure start to happen and why the operator state was restored? Job restart?

Best,
Andrey
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.4.2. java.lang.IllegalStateException: Could not initialize operator state backend

anaray
Thank You Andrey. Arity of the job has not changed. Here issue is that job
will run for sometime (with checkpoint enabled) and then after some time
will get into above exception. The job keeps restarting afterwards.

One thing that I want point out here is that we have a custom *serialization
schema* attached to *FlinkKafkaConsumer010*.  After going through
FLINK-8836, I doubt if real issue is with kryo instances being shared across
threads?

Thanks,







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.4.2. java.lang.IllegalStateException: Could not initialize operator state backend

Andrey Zagrebin-3
The stack trace shows that the state is being restored which has probably already happened after job restart. I am wondering why it has been restarted after some time of running. Could you share full job/task manager logs?

On Thu, May 16, 2019 at 6:26 AM anaray <[hidden email]> wrote:
Thank You Andrey. Arity of the job has not changed. Here issue is that job
will run for sometime (with checkpoint enabled) and then after some time
will get into above exception. The job keeps restarting afterwards.

One thing that I want point out here is that we have a custom *serialization
schema* attached to *FlinkKafkaConsumer010*.  After going through
FLINK-8836, I doubt if real issue is with kryo instances being shared across
threads?

Thanks,







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/