Restoring 1.3.1 savepoint in 1.4.0 fails in TimerService

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Restoring 1.3.1 savepoint in 1.4.0 fails in TimerService

Juho Autio
I'm trying to restore savepoints that were made with Flink 1.3.1 but getting this exception. The few code changes that had to be done to switch to 1.4.0 don't seem to be related to this, and it seems like an internal issue of Flink. Is 1.4.0 supposed to be able to restore a savepoint that was made with 1.3.1?

java.lang.IllegalStateException: Tried to initialize restored TimerService with different serializers than those used to snapshot its state.
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.startTimerService(HeapInternalTimerService.java:153)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:102)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:881)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:222)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

Reply | Threaded
Open this post in threaded view
|

Re: Restoring 1.3.1 savepoint in 1.4.0 fails in TimerService

Tzu-Li (Gordon) Tai
Hi Juho,

Could your key type have possibly changed / been modified across the
upgrade?
Also, from the error trace, it seems like the failing restore is of a window
operator. What window type are you using?

That exception is a result of either mismatching key serializers or
namespace serializers (i.e. a window serializer), so the above info should
help us narrow down the issue here.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Restoring 1.3.1 savepoint in 1.4.0 fails in TimerService

Juho Autio
Thanks, the window operator is just:

.timeWindow(Time.seconds(10))

We haven't changed key types.



I tried debugging this issue in IDE and found the root cause to be this:

!this.keyDeserializer.equals(keySerializer) -> true
=> throw new IllegalStateException("Tried to initialize restored TimerService with different serializers than those used to snapshot its state.");

This is in HeapInternalTimerService#startTimerService.



With debugger I can see this:

keySerializer = org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@45f8043a
keyDeserializer = org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@45f8043a
namespaceDeserializer = org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer@69d3cf7e

this.keySerializer = null
this.keyDeserializer = org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@e26116cd
this.namespaceDeserializer = null

Now, as the problamatic difference comes from comparison of this.keyDeserializer & keySerializer, some further details on those:
this.keyDeserializer.type = java.lang.Object (java.lang.class@325)
keySerializer.type = java.lang.Object (java.lang.class@325)

I dug in deeper to KryoSerializer#equals, found this condition to be the one that fails:

Objects.equals(this.kryoRegistrations, other.kryoRegistrations) -> false

Takes me down to KryoRegistration#equals:

this.registeredClass = class org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass
other.registeredClass = class org.apache.avro.generic.GenericData$Array

this.serializerDefinitionType = INSTANCE
other.serializerDefinitionType = CLASS

this.serializerClass = null
other.serializerClass = class org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass

Weird huh? I can't see how I would've changed anything related to these when making those minor code changes required in upgrading to 1.4.

Cheers,
Juho

On Fri, Jan 12, 2018 at 2:58 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Juho,

Could your key type have possibly changed / been modified across the
upgrade?
Also, from the error trace, it seems like the failing restore is of a window
operator. What window type are you using?

That exception is a result of either mismatching key serializers or
namespace serializers (i.e. a window serializer), so the above info should
help us narrow down the issue here.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Restoring 1.3.1 savepoint in 1.4.0 fails in TimerService

Tzu-Li (Gordon) Tai
Thanks a lot for looking into this with so much detail, Juho! It was super helpful.

Shortly put: This is indeed a bug with Flink.
The HeapInternalTimerService should be performing compatibility checks on the restored / provided serializers and reconfigure serializers if possible, instead of just an equals check.
I think the problem only surfaced now with Flink out-of-the-box because in Flink 1.4 we changed how we treat Avro dependencies, which affected the default KryoSerializer registrations.

I’ve filed a JIRA for this issue: https://issues.apache.org/jira/browse/FLINK-8421. The issue is made a blocker for 1.4.1, so we should expect that to be fixed in the next bugfix release.
Unfortunately, I don’t think there is a easy workaround for the issue at the moment.

Best,
Gordon

On 12 January 2018 at 11:07:18 PM, Juho Autio ([hidden email]) wrote:

Thanks, the window operator is just:

.timeWindow(Time.seconds(10))

We haven't changed key types.



I tried debugging this issue in IDE and found the root cause to be this:

!this.keyDeserializer.equals(keySerializer) -> true
=> throw new IllegalStateException("Tried to initialize restored TimerService with different serializers than those used to snapshot its state.");

This is in HeapInternalTimerService#startTimerService.



With debugger I can see this:

keySerializer = org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@45f8043a
keyDeserializer = org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@45f8043a
namespaceDeserializer = org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer@69d3cf7e

this.keySerializer = null
this.keyDeserializer = org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@e26116cd
this.namespaceDeserializer = null

Now, as the problamatic difference comes from comparison of this.keyDeserializer & keySerializer, some further details on those:
this.keyDeserializer.type = java.lang.Object (java.lang.class@325)
keySerializer.type = java.lang.Object (java.lang.class@325)

I dug in deeper to KryoSerializer#equals, found this condition to be the one that fails:

Objects.equals(this.kryoRegistrations, other.kryoRegistrations) -> false

Takes me down to KryoRegistration#equals:

this.registeredClass = class org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass
other.registeredClass = class org.apache.avro.generic.GenericData$Array

this.serializerDefinitionType = INSTANCE
other.serializerDefinitionType = CLASS

this.serializerClass = null
other.serializerClass = class org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass

Weird huh? I can't see how I would've changed anything related to these when making those minor code changes required in upgrading to 1.4.

Cheers,
Juho

On Fri, Jan 12, 2018 at 2:58 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Juho,

Could your key type have possibly changed / been modified across the
upgrade?
Also, from the error trace, it seems like the failing restore is of a window
operator. What window type are you using?

That exception is a result of either mismatching key serializers or
namespace serializers (i.e. a window serializer), so the above info should
help us narrow down the issue here.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/