Changing the way keys are defined breaks savepoints

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Changing the way keys are defined breaks savepoints

Andrea Gallina
Hi everyone,

I have a job running in production whose structure is approximately this;

stream
 ?? .filter(inboundData -> inboundData.hasToBeFiltered())
 ?? .keyBy("myKey")
 ?? .process(doSomething());

I've recently decided to test the extent to which I can change a job's
structure without breaking backward compatibility; more specifically,
I've tried to change the way the key is defined in the keyBy() operator
by defining it as a lambda function rather than by field expression. The
modified structure would therefore look like this:

stream
 ?? .filter(inboundData -> inboundData.hasToBeFiltered())
 ?? .keyBy(inboundData -> inboundData.getMyKey())
 ?? .process(doSomething());

I then tried to run the new job by restoring the savepoint taken with
the old structure, but I get a state migration exception:

org.apache.flink.util.StateMigrationException: The new key serializer
must be compatible

Now this was a bit unexpected since changing the way a key is defined
does not seem like a breaking change (unlike changing the actual key
used for partitioning).

Is this an expected behavior or am I missing something?

Thanks


---
Questa e-mail ? stata controllata per individuare virus con Avast antivirus.
https://www.avast.com/antivirus

Reply | Threaded
Open this post in threaded view
|

Re: Changing the way keys are defined breaks savepoints

Congxian Qiu
Hi,

From what you described, does the key have the same type before and after the change? and for using Lambda expression, maybe the doc[1] can be helpful(especially the limitation)


Andrea Gallina <[hidden email]> 于2019年8月12日周一 下午7:45写道:
Hi everyone,

I have a job running in production whose structure is approximately this;

stream
 ?? .filter(inboundData -> inboundData.hasToBeFiltered())
 ?? .keyBy("myKey")
 ?? .process(doSomething());

I've recently decided to test the extent to which I can change a job's
structure without breaking backward compatibility; more specifically,
I've tried to change the way the key is defined in the keyBy() operator
by defining it as a lambda function rather than by field expression. The
modified structure would therefore look like this:

stream
 ?? .filter(inboundData -> inboundData.hasToBeFiltered())
 ?? .keyBy(inboundData -> inboundData.getMyKey())
 ?? .process(doSomething());

I then tried to run the new job by restoring the savepoint taken with
the old structure, but I get a state migration exception:

org.apache.flink.util.StateMigrationException: The new key serializer
must be compatible

Now this was a bit unexpected since changing the way a key is defined
does not seem like a breaking change (unlike changing the actual key
used for partitioning).

Is this an expected behavior or am I missing something?

Thanks


---
Questa e-mail ? stata controllata per individuare virus con Avast antivirus.
https://www.avast.com/antivirus