I'm trying to restore savepoints that were made with Flink 1.3.1 but getting this exception. The few code changes that had to be done to switch to 1.4.0 don't seem to be related to this, and it seems like an internal issue of Flink. Is 1.4.0 supposed to be able to restore a savepoint that was made with 1.3.1? java.lang.IllegalStateException: Tried to initialize restored TimerService with different serializers than those used to snapshot its state. at org.apache.flink.streaming.api.operators.HeapInternalTimerService.startTimerService(HeapInternalTimerService.java:153) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:102) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:881) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:222) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) |
Hi Juho,
Could your key type have possibly changed / been modified across the upgrade? Also, from the error trace, it seems like the failing restore is of a window operator. What window type are you using? That exception is a result of either mismatching key serializers or namespace serializers (i.e. a window serializer), so the above info should help us narrow down the issue here. Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks, the window operator is just:
.timeWindow(Time.seconds(10)) We haven't changed key types. I tried debugging this issue in IDE and found the root cause to be this: !this.keyDeserializer.equals(keySerializer) -> true => throw new IllegalStateException("Tried to initialize restored TimerService with different serializers than those used to snapshot its state."); This is in HeapInternalTimerService#startTimerService. With debugger I can see this: keySerializer = org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@45f8043a keyDeserializer = org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@45f8043a namespaceDeserializer = org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer@69d3cf7e this.keySerializer = null this.keyDeserializer = org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@e26116cd this.namespaceDeserializer = null Now, as the problamatic difference comes from comparison of this.keyDeserializer & keySerializer, some further details on those: this.keyDeserializer.type = java.lang.Object (java.lang.class@325) keySerializer.type = java.lang.Object (java.lang.class@325) I dug in deeper to KryoSerializer#equals, found this condition to be the one that fails: Objects.equals(this.kryoRegistrations, other.kryoRegistrations) -> false Takes me down to KryoRegistration#equals: this.registeredClass = class org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass other.registeredClass = class org.apache.avro.generic.GenericData$Array this.serializerDefinitionType = INSTANCE other.serializerDefinitionType = CLASS this.serializerClass = null other.serializerClass = class org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass Weird huh? I can't see how I would've changed anything related to these when making those minor code changes required in upgrading to 1.4. Cheers, Juho On Fri, Jan 12, 2018 at 2:58 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote: Hi Juho, |
Thanks a lot for looking into this with so much detail, Juho! It was super helpful. Shortly put: This is indeed a bug with Flink. The HeapInternalTimerService should be performing compatibility checks on the restored / provided serializers and reconfigure serializers if possible, instead of just an equals check. I think the problem only surfaced now with Flink out-of-the-box because in Flink 1.4 we changed how we treat Avro dependencies, which affected the default KryoSerializer registrations. I’ve filed a JIRA for this issue: https://issues.apache.org/jira/browse/FLINK-8421. The issue is made a blocker for 1.4.1, so we should expect that to be fixed in the next bugfix release. Unfortunately, I don’t think there is a easy workaround for the issue at the moment. Best, Gordon On 12 January 2018 at 11:07:18 PM, Juho Autio ([hidden email]) wrote:
|
Free forum by Nabble | Edit this page |