Problem restirng state

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem restirng state

Shridhar Kulkarni
All,

We are getting the exception, copied at the end of this post. The exception is thrown when a new flink job is submitted; when Flink tries to restore the previous state.

Environment:
    Flink version: 1.10.1
    State persistence: Hadoop 3.3
    Zookeeper 3.5.8
    Parallelism: 4

The code implements DataStream Transformation functions: ProcessFunction -> KeySelector -> ProcessFunction
Inbound messages are partitioned by key "sourceId" which is a part of the exception stack trace. SourceId is String type and is unique.
-------
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 109, Size: 10
Serialization trace:
sourceId (com.contineo.ext.flink.core.ThingState)
-------

We have overridden "org.apache.flink.streaming.api.functions.ProcessFunction.open()" method
Any help is appreciated


Exception stack trace:

2021-01-19 19:59:56,934 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source -> Process -> Process (3/4) of job c957f40043721b5cab3161991999a7ed is not in state RUNNING but DEPLOYING instead. Aborting checkpoint.
2021-01-19 19:59:57,358 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Process -> Sink: Unnamed (4/4) (b2605627c2fffc83dd412b3e7565244d) switched from RUNNING to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for LegacyKeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(4/4) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 109, Size: 10
Serialization trace:
sourceId (com.contineo.ext.flink.core.ThingState)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 10
at java.util.ArrayList.rangeCheck(ArrayList.java:659)
at java.util.ArrayList.get(ArrayList.java:435)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
... 24 more

Your help is highly appreciated,

Thanks,
Shridhar

Reply | Threaded
Open this post in threaded view
|

Re: Problem restirng state

Timo Walther
Hi Shridhar,

the exception indicates that something is wrong with the object
serialization. Kryo is unable to serialize the given object.

It might help to

1) register a custom Kryo serializer in the ExecutionConfig or

2 ) pass dedicated type information using the types from
org.apache.flink.api.common.typeinfo.Types such that Kryo as the
"generic" serializer is not necessary anymore.

I hope this helps.

Regards,
Timo

On 28.01.21 12:41, Shridhar Kulkarni wrote:

> All,
>
> We are getting the exception, copied at the end of this post. The
> exception is thrown when a new flink job is submitted; when Flink tries
> to restore the previous state.
>
> Environment:
>      Flink version: 1.10.1
>      State persistence: Hadoop 3.3
>      Zookeeper 3.5.8
>      Parallelism: 4
>
> The code implements DataStream Transformation functions: ProcessFunction
> -> KeySelector -> ProcessFunction
> Inbound messages are partitioned by key "sourceId" which is a part of
> the exception stack trace. SourceId is String type and is unique.
> -------
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.IndexOutOfBoundsException: Index: 109, Size: 10
> Serialization trace:
> sourceId (com.contineo.ext.flink.core.ThingState)
> -------
>
> We have overridden
> "org.apache.flink.streaming.api.functions.ProcessFunction.open()" method
> Any help is appreciated
>
>
> Exception stack trace:
>
> 2021-01-19 19:59:56,934 INFO
>   org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Checkpoint triggering task Source: Custom Source -> Process -> Process
> (3/4) of job c957f40043721b5cab3161991999a7ed is not in state RUNNING
> but DEPLOYING instead. Aborting checkpoint.
> 2021-01-19 19:59:57,358 INFO
>   org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Process -> Sink: Unnamed (4/4) (b2605627c2fffc83dd412b3e7565244d)
> switched from RUNNING to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> LegacyKeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(4/4) from
> any of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Failed when trying to restore heap backend
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.IndexOutOfBoundsException: Index: 109, Size: 10
> Serialization trace:
> sourceId (com.contineo.ext.flink.core.ThingState)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
> at
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
> ... 15 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 10
> at java.util.ArrayList.rangeCheck(ArrayList.java:659)
> at java.util.ArrayList.get(ArrayList.java:435)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
> ... 24 more
>
> Your help is highly appreciated,
>
> Thanks,
> Shridhar
>