Hello everyone, while trying to restart a flink job from an externalized checkpoint I'm getting the following exception: java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Unable to restore keyed state [window-contents]. For memory-backed keyed state, the previous serializer of the keyed state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions. at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:465) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) ... 6 more The failing job, causing this exception, is "latest-time"->"map_active_stream" and uses JodaDateTimeSerializer, behaving like it follows: // Preprocessing with Aggregation to get only the most recent event val airtrafficEvents = streamByID .window(TumblingEventTimeWindows.of(Time.seconds(20))) .maxBy("airTrafficEvent").name("latest_time").uid("latest_time") // Sinks val activeStream = airtrafficEvents .map(event => event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream") .timeWindowAll(Time.seconds(10)) .apply(new MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window") This exception occurred after restarting the job from an externalized checkpoint, after rebuilding the uber-jar because of the removal of a sink which wasn't needed anymore, using thus --allowNonRestoredState while restarting. I'd like to stress that the serializer has always been in the classpath, inside the uber-jar and no change of implementation was made in between executions. I reproduced this behaviour by commenting in and out this sink, rebuilding and restarting the job both from a savepoint and an externalized checkpoint. Do you have any insight on this? Cheers, Federico D'Ambrosio |
Free forum by Nabble | Edit this page |