Working around "The new key serializer must be compatible"

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Working around "The new key serializer must be compatible"

Oytun Tez
Hi there,

We have a new key in one of our operators, and are getting this error when the job tries to start from a savepoint: 

image.png

Is there any way to specify a custom key serializer for the operator? How can I achieve migration of a key? I understand how to migrate states' themselves, but I am blind about migrating keys.

Second, related question is: we have multiple operators in the job. Is this possible: continue the job from savepoint, start with empty state for those operators that fail to be filled from savepoint (for instance, this operator with the key issue).

Any ideas?




---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
Reply | Threaded
Open this post in threaded view
|

Re: Working around "The new key serializer must be compatible"

Oytun Tez
Thank you, Seth!

I was scared that "--allowNonRestoredState" flag would start the whole job with a clean slate, rather than only the failed operator. Can you confirm only the failed operator will start with empty state, but other operators will continue from the savepoint (assuming their state is healthy)?



---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Mon, Aug 19, 2019 at 12:20 PM Seth Wiesman <[hidden email]> wrote:
Hi Oytun,

Key migrations is explicitly disallowed in Flink for two reasons.

1) The key may be migrated in such a way that two or more keys clash[1]
2) The migrated key may result in a different key group assignment. Key groups are how flink routes keys during a keyby and used for redistributing state during a restore or rescaling. Key groups are immutable by design.

Regarding your second question. Yes, simply assign a different uid to the operator and then restore with the `--allowNonRestoredState` flag[2, 3].

[1] https://issues.apache.org/jira/browse/FLINK-13788 (I've opened a ticket to add this to the docs)

On Mon, Aug 19, 2019 at 10:57 AM Oytun Tez <[hidden email]> wrote:
Hi there,

We have a new key in one of our operators, and are getting this error when the job tries to start from a savepoint: 

image.png

Is there any way to specify a custom key serializer for the operator? How can I achieve migration of a key? I understand how to migrate states' themselves, but I am blind about migrating keys.

Second, related question is: we have multiple operators in the job. Is this possible: continue the job from savepoint, start with empty state for those operators that fail to be filled from savepoint (for instance, this operator with the key issue).

Any ideas?




---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


--

Seth Wiesman | Solutions Architect

+1 314 387 1463



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
Reply | Threaded
Open this post in threaded view
|

Re: Working around "The new key serializer must be compatible"

Andrea Gallina

Hi Oytun,

Me and my team faced the exact same issue two weeks ago (we had to migrate the key used in the keyBy() operator, possibly without losing the state contained in our savepoint). After some days of experimentation, we just gave up and started over with an empty state.

Just about 30 minutes after we had deployed the new version with the clean state, we found out that version 1.9 has a new API (State Processor API: https://issues.apache.org/jira/browse/FLINK-12047) that allows you to manipulate a savepoint in various ways. So we started fiddling around with the tests to see how to use it, and we were actually able to accomplish what we required to do (read the old state from the savepoint, write it to a new state with a different partition key, and restart the job from the modified savepoint). You might want to check that out in case you haven't already dumped your state (unlike us :P).


Thank you, Seth!

I was scared that "--allowNonRestoredState" flag would start the whole job with a clean slate, rather than only the failed operator. Can you confirm only the failed operator will start with empty state, but other operators will continue from the savepoint (assuming their state is healthy)?



---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Mon, Aug 19, 2019 at 12:20 PM Seth Wiesman <[hidden email]> wrote:
Hi Oytun,

Key migrations is explicitly disallowed in Flink for two reasons.

1) The key may be migrated in such a way that two or more keys clash[1]
2) The migrated key may result in a different key group assignment. Key groups are how flink routes keys during a keyby and used for redistributing state during a restore or rescaling. Key groups are immutable by design.

Regarding your second question. Yes, simply assign a different uid to the operator and then restore with the `--allowNonRestoredState` flag[2, 3].

[1] https://issues.apache.org/jira/browse/FLINK-13788 (I've opened a ticket to add this to the docs)

On Mon, Aug 19, 2019 at 10:57 AM Oytun Tez <[hidden email]> wrote:
Hi there,

We have a new key in one of our operators, and are getting this error when the job tries to start from a savepoint:??

image.png

Is there any way to specify a custom key serializer for the operator? How can I achieve migration of a key? I understand how to migrate states' themselves, but I am blind about migrating keys.

Second, related question is: we have multiple operators in the job. Is this possible: continue the job from savepoint, start with empty state for those operators that fail to be filled from savepoint (for instance, this operator with the key issue).

Any ideas?




---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


--

Seth Wiesman | Solutions Architect

+1 314 387 1463



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
Reply | Threaded
Open this post in threaded view
|

Re: Working around "The new key serializer must be compatible"

Oytun Tez
Hi Andrea!

Indeed, Savepoint API is going to be very useful, we've been following its development. For now, this state wasn't too important for us, so we are ditching it.

But once 1.9 is GA, we will make use of the Savepoint API much more – especially for end-users.



---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Mon, Aug 19, 2019 at 2:58 PM Andrea Gallina <[hidden email]> wrote:

Hi Oytun,

Me and my team faced the exact same issue two weeks ago (we had to migrate the key used in the keyBy() operator, possibly without losing the state contained in our savepoint). After some days of experimentation, we just gave up and started over with an empty state.

Just about 30 minutes after we had deployed the new version with the clean state, we found out that version 1.9 has a new API (State Processor API: https://issues.apache.org/jira/browse/FLINK-12047) that allows you to manipulate a savepoint in various ways. So we started fiddling around with the tests to see how to use it, and we were actually able to accomplish what we required to do (read the old state from the savepoint, write it to a new state with a different partition key, and restart the job from the modified savepoint). You might want to check that out in case you haven't already dumped your state (unlike us :P).


Thank you, Seth!

I was scared that "--allowNonRestoredState" flag would start the whole job with a clean slate, rather than only the failed operator. Can you confirm only the failed operator will start with empty state, but other operators will continue from the savepoint (assuming their state is healthy)?



---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Mon, Aug 19, 2019 at 12:20 PM Seth Wiesman <[hidden email]> wrote:
Hi Oytun,

Key migrations is explicitly disallowed in Flink for two reasons.

1) The key may be migrated in such a way that two or more keys clash[1]
2) The migrated key may result in a different key group assignment. Key groups are how flink routes keys during a keyby and used for redistributing state during a restore or rescaling. Key groups are immutable by design.

Regarding your second question. Yes, simply assign a different uid to the operator and then restore with the `--allowNonRestoredState` flag[2, 3].

[1] https://issues.apache.org/jira/browse/FLINK-13788 (I've opened a ticket to add this to the docs)

On Mon, Aug 19, 2019 at 10:57 AM Oytun Tez <[hidden email]> wrote:
Hi there,

We have a new key in one of our operators, and are getting this error when the job tries to start from a savepoint:??

image.png

Is there any way to specify a custom key serializer for the operator? How can I achieve migration of a key? I understand how to migrate states' themselves, but I am blind about migrating keys.

Second, related question is: we have multiple operators in the job. Is this possible: continue the job from savepoint, start with empty state for those operators that fail to be filled from savepoint (for instance, this operator with the key issue).

Any ideas?




---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


--

Seth Wiesman | Solutions Architect

+1 314 387 1463



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen