Checkpointed state fails recovery

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Checkpointed state fails recovery

Varun Dhore
Hello Flink community,

Need help with this issue. I am using Flink 1.3 snapshot with HA config and RocksDB as a state backend for checkpoints. I have 1 primary and 1 standby JM running in HA mode. Once lead JM crashes the standby JM takes over just fine. However the streaming job fails recovery due to exception in deserialization of a checkpointed state. This is only happening for certain jobs but there is no way for me to narrow down the root of this exception in my application. Following exception is thrown by the job manager. Please advise.

Caused by: java.lang.RuntimeException: Could not instantiate type 'org.apache.flink.api.scala.typeutils.TraversableSerializer$TraversableSerializerConfigSnapshot' Most likely the constructor (or a member variable initialization) threw an exception.
        at org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:156)
        at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:432)
        at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:248)
        at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:204)
        at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot.read(TupleSerializerConfigSnapshot.java:61)
        at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:434)
        at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:248)
        at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:204)
        at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:198)
        at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:130)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readMetaData(RocksDBKeyedStateBackend.java:1239)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1322)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1491)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:963)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:771)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
        ... 6 more
Caused by: java.lang.NullPointerException
        at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$TraversableSerializerConfigSnapshot.<init>(TraversableSerializer.scala:186)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$TraversableSerializerConfigSnapshot.<init>(TraversableSerializer.scala:189)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:143)
        ... 22 more

Thanks,
Varun