Error restarting job from Savepoint

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Error restarting job from Savepoint

Yashwant Ganti
Hello,

We are facing an error restarting a job from a savepoint. We believe it is because one of the common classes used across all of our jobs was changed but there was no serialVersionUID assigned to the class. There error we are facing is

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_(2/4) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:115)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:559)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:101)
    at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:181)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 11 more
Caused by: java.io.InvalidClassException: com.****.******.*******; local class incompatible: stream classdesc serialVersionUID = -7317586767482317266, local class serialVersionUID = -8797204481428423223
    at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:194)
    at org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:54)
    at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:669)
    at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:642)
    at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:118)
    at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
    at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323)
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
    ... 17 more

The change was to have the failing class implement serializable. My questions are
  • What are our options now to get this job to restart? In the non production environments we can delete the savepoint but we really don't want to do that in production
  • Any best practices/guidance to follow to avoid such issues in the future. Should we have just implemented a serialVersionUID for the class? Should we/can we write custom serializers/deserializers for this class. The class is just a factory which creates connection objects so we normally don't think twice in standard java applications
Any help is appreciated.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Error restarting job from Savepoint

JING ZHANG
Hi Yashwant Ganti,

> Caused by: java.io.InvalidClassException: com.****.******.*******; local class incompatible: stream classdesc serialVersionUID = -7317586767482317266, local class serialVersionUID = -8797204481428423223
1. Please try this way: find the com.****.******.*******, add `private
static final long serialVersionUID = -7317586767482317266` (which is
old serialVersionUID mentioned in the above error message). Then
repackage your jar, restart job with new jar.
2. Serializable classes must define a Serial Version UID, please
see(https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization).
Please add a serialVersionUID for Serializable classes, especially
those which would take part in checkpointing/savepointing.

Best,
JING ZHANG

Yashwant Ganti <[hidden email]> 于2021年5月27日周四 上午1:15写道:

>
> Hello,
>
> We are facing an error restarting a job from a savepoint. We believe it is because one of the common classes used across all of our jobs was changed but there was no serialVersionUID assigned to the class. There error we are facing is
>
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
>>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>     at java.base/java.lang.Thread.run(Unknown Source)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_(2/4) from any of the 1 provided restore options.
>>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>>     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>>     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>     ... 9 more
>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
>>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:115)
>>     at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:559)
>>     at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:101)
>>     at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:181)
>>     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>     ... 11 more
>> Caused by: java.io.InvalidClassException: com.****.******.*******; local class incompatible: stream classdesc serialVersionUID = -7317586767482317266, local class serialVersionUID = -8797204481428423223
>>     at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>>     at org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:194)
>>     at org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:54)
>>     at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:669)
>>     at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:642)
>>     at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:118)
>>     at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
>>     at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
>>     at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323)
>>     at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
>>     at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
>>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
>>     ... 17 more
>
>
> The change was to have the failing class implement serializable. My questions are
>
> What are our options now to get this job to restart? In the non production environments we can delete the savepoint but we really don't want to do that in production
> Any best practices/guidance to follow to avoid such issues in the future. Should we have just implemented a serialVersionUID for the class? Should we/can we write custom serializers/deserializers for this class. The class is just a factory which creates connection objects so we normally don't think twice in standard java applications
>
> Any help is appreciated.
>
> Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Error restarting job from Savepoint

Yun Tang
In reply to this post by Yashwant Ganti
Hi Ganti,

If you could ensure that newer class could keep backwards compatibility as previous class, you can try to set serialVesionUID explicitly of current class to -7317586767482317266.

If you want to avoid such issue later, you must set the serialVesionUID explicitly first if not using customized serializer for those classes. Another better solution is to ensure the class backwards compatibility with customized serializer or leverage apache avro.

You could refer to [1] for more details.


Best
Yun Tang

From: Yashwant Ganti <[hidden email]>
Sent: Thursday, May 27, 2021 1:14
To: [hidden email] <[hidden email]>
Subject: Error restarting job from Savepoint
 
Hello,

We are facing an error restarting a job from a savepoint. We believe it is because one of the common classes used across all of our jobs was changed but there was no serialVersionUID assigned to the class. There error we are facing is

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_(2/4) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:115)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:559)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:101)
    at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:181)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 11 more
Caused by: java.io.InvalidClassException: com.****.******.*******; local class incompatible: stream classdesc serialVersionUID = -7317586767482317266, local class serialVersionUID = -8797204481428423223
    at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:194)
    at org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:54)
    at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:669)
    at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:642)
    at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:118)
    at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
    at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323)
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
    ... 17 more

The change was to have the failing class implement serializable. My questions are
  • What are our options now to get this job to restart? In the non production environments we can delete the savepoint but we really don't want to do that in production
  • Any best practices/guidance to follow to avoid such issues in the future. Should we have just implemented a serialVersionUID for the class? Should we/can we write custom serializers/deserializers for this class. The class is just a factory which creates connection objects so we normally don't think twice in standard java applications
Any help is appreciated.

Thanks!