Could not initialize keyed state backend on restart from checkpoint

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Could not initialize keyed state backend on restart from checkpoint

Federico D'Ambrosio
Hello everyone,

while trying to restart a flink job from an externalized checkpoint I'm getting the following exception:

java.lang.IllegalStateException: Could not initialize keyed state backend.
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to restore keyed state [window-contents]. For memory-backed keyed state, the previous serializer of the keyed state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:465)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
        ... 6 more


The failing job, causing this exception, is "latest-time"->"map_active_stream" and uses JodaDateTimeSerializer, behaving like it follows:

// Preprocessing with Aggregation to get only the most recent event
val airtrafficEvents = streamByID
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.maxBy("airTrafficEvent").name("latest_time").uid("latest_time")

// Sinks
val activeStream = airtrafficEvents
.map(event => event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream")
.timeWindowAll(Time.seconds(10))
.apply(new MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window")

This exception occurred after restarting the job from an externalized checkpoint, after rebuilding the uber-jar because of the removal of a sink which wasn't needed anymore, using thus --allowNonRestoredState while restarting. I'd like to stress that the serializer has always been in the classpath, inside the uber-jar and no change of implementation was made in between executions.

I reproduced this behaviour by commenting in and out this sink, rebuilding and restarting the job both from a savepoint and an externalized checkpoint.

Do you have any insight on this?

Cheers,
Federico D'Ambrosio