Re: possible backwards compatibility issue between 1.8->1.9?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view

Re: possible backwards compatibility issue between 1.8->1.9?

Piotr Nowojski-3

(This question is more appropriate for the user mailing list, not dev - when responding to my e-mail please remove dev mailing list from the recipients, I’ve kept it just FYI that discussion has moved to user mailing list).

Could it be, that the problem is caused by changes in chaining strategy of the AsyncWaitOperator in 1.9, as explained in the release notes [1]?

> AsyncIO
> Due to a bug in the AsyncWaitOperator, in 1.9.0 the default chaining behaviour of the operator is now changed so that it 
> is never chained after another operator. This should not be problematic for migrating from older version snapshots as 
> long as an uid was assigned to the operator. If an uid was not assigned to the operator, please see the instructions here [2]
> for a possible workaround.
> Related issues:
> • FLINK-13063: AsyncWaitOperator shouldn’t be releasing checkpointingLock [3]


On 30 Oct 2019, at 16:52, Bekir Oguz <[hidden email]> wrote:

Hi guys,
during our upgrade from 1.8.1 to 1.9.1, one of our jobs fail to start with
the following exception. We deploy the job with 'allow-non-restored-state'
option and from the latest checkpoint dir of the 1.8.1 version.

org.apache.flink.util.StateMigrationException: The new state typeSerializer
for operator state must not be incompatible.
   at org.apache.flink.runtime.state.DefaultOperatorStateBackend
   at org.apache.flink.runtime.state.DefaultOperatorStateBackend
   at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
   at org.apache.flink.streaming.api.operators.AbstractStreamOperator
   at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(
   at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
   at org.apache.flink.runtime.taskmanager.Task.doRun(

We see from the Web UI that the 'async wait operator' is causing this,
which is not changed at all during this upgrade.

All other jobs are migrated without problems, only this one is failing. Has
anyone else experienced this during migration?

Bekir Oguz

Reply | Threaded
Open this post in threaded view

Re: possible backwards compatibility issue between 1.8->1.9?

Piotr Nowojski-3

It sounds strange. 

In the second example aren’t you just setting the “name” and “uid” for the last “map” transformation? While you would like to set it for `unorderedWait` and `filter` as well? I guess you can check this out in your application logs. Can you check what are the actual uids that are being set and used?

For the first example, I don’t know. Could it be an issue of Scala and some implicit magic/wrapping (since you are using JavaAsyncDataStream)? Maybe something adds some mapping operator to convert types? And the `uid`  is assigned not to the AsyncWaitOperator? 

Also to be honest, the exception message doesn’t look like the one I would be expecting to see. It looks more like some issue with the state migration. Gordon, could you take a look? You also might be a bit more familiar with some quirks of setting the `uid`.


On 31 Oct 2019, at 17:51, Bekir Oguz <[hidden email]> wrote:

Hi Piotr,

We missed this note from the release notes, but still surprised to hit this bug in our implementation which conforms the workaround solution explained.
The weird behaviour is, we use this async stream in 2 different jobs. And in TEST environment operator chaining disabled, in PROD enabled.

Our first job looks like this:
val asyncStream =
.unorderedWait(stream.javaStream, new Enricher(config), 60, TimeUnit.SECONDS)
startNewChain() already sets the ChainingStrategy to HEAD (similar to the bugfix for FLINK-13063) and we assign a unique UID which was suggested as a workaround.

Our second job looks like this:
new EnricherFunction(config),
.filter(new EnricherFilter)

In test environment, the first job could not restore from last checkpoint (we started with no state to fix this), but the second job succeeded.
In prod environment, failure and the solution was the same for the first job. But then the second job failed to restore its state (different behaviour than test). Since this job has also a user state in a KeyedProcessFunction, starting without state was not an option for us. We just tried to restart it with "operator chaining disabled", and then surprisingly it worked.

How can we explain this different behaviour of the second job in test and prod? The only visible difference is the operator chaining config.

Thanks in advance,

On Thu, 31 Oct 2019 at 09:44, Piotr Nowojski <[hidden email]> wrote:

(This question is more appropriate for the user mailing list, not dev - when responding to my e-mail please remove dev mailing list from the recipients, I’ve kept it just FYI that discussion has moved to user mailing list).

Could it be, that the problem is caused by changes in chaining strategy of the AsyncWaitOperator in 1.9, as explained in the release notes [1]?

> AsyncIO
> Due to a bug in the AsyncWaitOperator, in 1.9.0 the default chaining behaviour of the operator is now changed so that it 
> is never chained after another operator. This should not be problematic for migrating from older version snapshots as 
> long as an uid was assigned to the operator. If an uid was not assigned to the operator, please see the instructions here [2]
> for a possible workaround.
> Related issues:
> • FLINK-13063: AsyncWaitOperator shouldn’t be releasing checkpointingLock [3]


On 30 Oct 2019, at 16:52, Bekir Oguz <[hidden email]> wrote:

Hi guys,
during our upgrade from 1.8.1 to 1.9.1, one of our jobs fail to start with
the following exception. We deploy the job with 'allow-non-restored-state'
option and from the latest checkpoint dir of the 1.8.1 version.

org.apache.flink.util.StateMigrationException: The new state typeSerializer
for operator state must not be incompatible.
   at org.apache.flink.runtime.state.DefaultOperatorStateBackend
   at org.apache.flink.runtime.state.DefaultOperatorStateBackend
   at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
   at org.apache.flink.streaming.api.operators.AbstractStreamOperator
   at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(
   at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
   at org.apache.flink.runtime.taskmanager.Task.doRun(

We see from the Web UI that the 'async wait operator' is causing this,
which is not changed at all during this upgrade.

All other jobs are migrated without problems, only this one is failing. Has
anyone else experienced this during migration?

Bekir Oguz