StreamCorruptedException
Posted by
Sridhar Chellappa on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/StreamCorruptedException-tp15607.html
I am running Flink 1.3.0 against Kafka 0.10. I managed to bring the flink cluster up and have been running my flink CEP job for more than 3 hours when I see the following exception :
The messages consumed from Kafka are protobuf messages and I use a protobuf serializer. i have no clue as to where is this exception coming from. Can someone help?
java.lang.IllegalStateException: Could not initialize keyed state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:675)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:662)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2828)
at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2862)
at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2764)
at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:2196)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1838)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1203)
at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1161)
at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:948)
at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:839)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:473)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:354)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:771)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
... 6 more