Problem with savepoint deserialization

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem with savepoint deserialization

Steven Nelson
Hello! We are working with a Scala based pipeline.

We changed 

case class Record(orgId: Int)

To

case class Record(orgId: Int, operationId:Option[String] = None)

And now our savepoints fail with this exception:
org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
    at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

I was under the impression we could add items to case classes and still be able to use existing state to start the job.

-Steve

Reply | Threaded
Open this post in threaded view
|

Re: Problem with savepoint deserialization

Aleksandar Mastilovic
The Option class is not serializable, if you put something serializable into that case class you wouldn’t have problems.

On Oct 8, 2019, at 8:17 AM, Steven Nelson <[hidden email]> wrote:

Hello! We are working with a Scala based pipeline.

We changed 

case class Record(orgId: Int)

To

case class Record(orgId: Int, operationId:Option[String] = None)

And now our savepoints fail with this exception:
org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
    at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

I was under the impression we could add items to case classes and still be able to use existing state to start the job.

-Steve


Reply | Threaded
Open this post in threaded view
|

Re: Problem with savepoint deserialization

Steven Nelson
Are you sure? I just restarted the job with new new version, but not from a savepoint and took a new savepoint and it seemed to work from there. It just seemed like it couldn’t upgrade the schema during restore.

Sent from my iPhone

On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic <[hidden email]> wrote:

The Option class is not serializable, if you put something serializable into that case class you wouldn’t have problems.

On Oct 8, 2019, at 8:17 AM, Steven Nelson <[hidden email]> wrote:

Hello! We are working with a Scala based pipeline.

We changed 

case class Record(orgId: Int)

To

case class Record(orgId: Int, operationId:Option[String] = None)

And now our savepoints fail with this exception:
org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
    at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

I was under the impression we could add items to case classes and still be able to use existing state to start the job.

-Steve


Reply | Threaded
Open this post in threaded view
|

Re: Problem with savepoint deserialization

Aleksandar Mastilovic
I’m pretty sure java.util.Optional is not serializable: https://stackoverflow.com/questions/24547673/why-java-util-optional-is-not-serializable-how-to-serialize-the-object-with-suc

However on a second look I can now see you’re using Scala’s Option, which IS serializable :) My apologies for that.

So your problem was that you had previous version in your save point which of course cannot be deserialized into the new version without custom code that would handle a missing Option.

On Oct 8, 2019, at 11:38 AM, Steven Nelson <[hidden email]> wrote:

Are you sure? I just restarted the job with new new version, but not from a savepoint and took a new savepoint and it seemed to work from there. It just seemed like it couldn’t upgrade the schema during restore.

Sent from my iPhone

On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic <[hidden email]> wrote:

The Option class is not serializable, if you put something serializable into that case class you wouldn’t have problems.

On Oct 8, 2019, at 8:17 AM, Steven Nelson <[hidden email]> wrote:

Hello! We are working with a Scala based pipeline.

We changed 

case class Record(orgId: Int)

To

case class Record(orgId: Int, operationId:Option[String] = None)

And now our savepoints fail with this exception:
org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
    at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

I was under the impression we could add items to case classes and still be able to use existing state to start the job.

-Steve



Reply | Threaded
Open this post in threaded view
|

Re: Problem with savepoint deserialization

Steven Nelson
No problem :)

I wasn’t able to find documentation on what can and cannot be upgraded for case classes. I had assumed the same rules that applied to POJO scheme upgrading applied to case classes. Has someone put together rules for case classes? I also should have mentioned we are running 1.9 Flink.



Sent from my iPhone

On Oct 8, 2019, at 3:03 PM, Aleksandar Mastilovic <[hidden email]> wrote:

I’m pretty sure java.util.Optional is not serializable: https://stackoverflow.com/questions/24547673/why-java-util-optional-is-not-serializable-how-to-serialize-the-object-with-suc

However on a second look I can now see you’re using Scala’s Option, which IS serializable :) My apologies for that.

So your problem was that you had previous version in your save point which of course cannot be deserialized into the new version without custom code that would handle a missing Option.

On Oct 8, 2019, at 11:38 AM, Steven Nelson <[hidden email]> wrote:

Are you sure? I just restarted the job with new new version, but not from a savepoint and took a new savepoint and it seemed to work from there. It just seemed like it couldn’t upgrade the schema during restore.

Sent from my iPhone

On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic <[hidden email]> wrote:

The Option class is not serializable, if you put something serializable into that case class you wouldn’t have problems.

On Oct 8, 2019, at 8:17 AM, Steven Nelson <[hidden email]> wrote:

Hello! We are working with a Scala based pipeline.

We changed 

case class Record(orgId: Int)

To

case class Record(orgId: Int, operationId:Option[String] = None)

And now our savepoints fail with this exception:
org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
    at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

I was under the impression we could add items to case classes and still be able to use existing state to start the job.

-Steve



Reply | Threaded
Open this post in threaded view
|

Re: Problem with savepoint deserialization

Congxian Qiu
Hi  Steven

From the exception, seems the serializer used before and after the change is incompatible, I'm not very familiar with Scala case class, maybe you can debug it locally, which serializer used before and after the change for the case class.

Best,
Congxian


Steven Nelson <[hidden email]> 于2019年10月9日周三 上午4:09写道:
No problem :)

I wasn’t able to find documentation on what can and cannot be upgraded for case classes. I had assumed the same rules that applied to POJO scheme upgrading applied to case classes. Has someone put together rules for case classes? I also should have mentioned we are running 1.9 Flink.



Sent from my iPhone

On Oct 8, 2019, at 3:03 PM, Aleksandar Mastilovic <[hidden email]> wrote:

I’m pretty sure java.util.Optional is not serializable: https://stackoverflow.com/questions/24547673/why-java-util-optional-is-not-serializable-how-to-serialize-the-object-with-suc

However on a second look I can now see you’re using Scala’s Option, which IS serializable :) My apologies for that.

So your problem was that you had previous version in your save point which of course cannot be deserialized into the new version without custom code that would handle a missing Option.

On Oct 8, 2019, at 11:38 AM, Steven Nelson <[hidden email]> wrote:

Are you sure? I just restarted the job with new new version, but not from a savepoint and took a new savepoint and it seemed to work from there. It just seemed like it couldn’t upgrade the schema during restore.

Sent from my iPhone

On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic <[hidden email]> wrote:

The Option class is not serializable, if you put something serializable into that case class you wouldn’t have problems.

On Oct 8, 2019, at 8:17 AM, Steven Nelson <[hidden email]> wrote:

Hello! We are working with a Scala based pipeline.

We changed 

case class Record(orgId: Int)

To

case class Record(orgId: Int, operationId:Option[String] = None)

And now our savepoints fail with this exception:
org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
    at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

I was under the impression we could add items to case classes and still be able to use existing state to start the job.

-Steve