Problem when restoring from savepoint with missing state & POJO modification

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem when restoring from savepoint with missing state & POJO modification

bastien dine
Hello,
We have experienced some weird issues with POJO mapState in a streaming job upon checkpointing when removing state, then modifying the state POJO and restoring job

Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)

Reproduced in Flink 1.10 & 1.11
(full stack below)  

Context :
We have a streaming job with a state name "buffer" and POJO Buffer inside a CoFlatMap function

MyCoFlat:
public class MyCoFlat extends RichCoFlatMapFunction<Pojo1, Pojo1, v> {
transient MapState<String, Buffer> buffer;
@Override
public void open(Configuration parameters) {
buffer = getRuntimeContext().getMapState(new MapStateDescriptor<>("buffer", String.class, Buffer.class));
}
....

Buffer :
public class Buffer {
private String field1;
private String field2;
private String field3;
... + empty constructor  + getter / setter for POJO consideration


We had some troubles with our job, so we rework 2 things :
 - we removed field2 in Buffer class,
 - we stopped using "buffer" state anymore

When restoring with savepoint (--allowNonRestoredState) we have the exception below
The job is submitted to the cluster but fails on checkpointing, job is totally stuck.

Debug:
Debugging showed us some stuff, the exception is raised here (as expected):

public PojoSerializer(
Class<T> clazz,
TypeSerializer<?>[] fieldSerializers,
Field[] fields,
ExecutionConfig executionConfig) {
this.clazz = checkNotNull(clazz);
this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
this.fields = checkNotNull(fields);
this.numFields = fieldSerializers.length;
this.executionConfig = checkNotNull(executionConfig);
for (int i = 0; i < numFields; i++) {
this.fields[i].setAccessible(true); <---- HERE
}


In our fields, we have field[0] & field[2] but field[1] is totally missing from the array, that's why we have the NPE over here, when i=1

So what we have done is to put this state back in our streaming job (with the missing field and POJO), redeploy with old savepoint and this went totally fine
Then we have redeploy a job without this state
This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> remove the state using this POJO)
But the non-used-anymore state is still (at least the serializer) in the savepoints, we could be facing this problem again when we will modify Buffer POJO later.
Finally we just modify a savepoint with API and remove this state once for all, and restart from it.

I have a couple of questions here:
Why does flink keep a non-used state in a savepoint (even if it can not map it into a new topology and allowNonRestoredState is checked ?)
Why does flink not handle this case ? Behaviour seems to be different between an existing POJO state and this non used POJO state
How can I clean my savepoint ? I don't want them to contain non-used state

If anybody has experienced an issue like that before or knows how to handle this, I would be glad to discuss !
Best regards,

------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
Reply | Threaded
Open this post in threaded view
|

Re: Problem when restoring from savepoint with missing state & POJO modification

Yun Tang
Hi Bastien,

Flink supports to register state via state descriptor when calling runtimeContext.getState(). However, once the state is registered, it cannot be removed anymore. And when you restore from savepoint, the previous state is registered again [1]. Flink does not to drop state directly and you could use state processor API [2] to remove related state.




From: bastien dine <[hidden email]>
Sent: Tuesday, December 8, 2020 0:28
To: user <[hidden email]>
Subject: Problem when restoring from savepoint with missing state & POJO modification
 
Hello,
We have experienced some weird issues with POJO mapState in a streaming job upon checkpointing when removing state, then modifying the state POJO and restoring job

Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)

Reproduced in Flink 1.10 & 1.11
(full stack below)  

Context :
We have a streaming job with a state name "buffer" and POJO Buffer inside a CoFlatMap function

MyCoFlat:
public class MyCoFlat extends RichCoFlatMapFunction<Pojo1, Pojo1, v> {
transient MapState<String, Buffer> buffer;
@Override
public void open(Configuration parameters) {
buffer = getRuntimeContext().getMapState(new MapStateDescriptor<>("buffer", String.class, Buffer.class));
}
....

Buffer :
public class Buffer {
private String field1;
private String field2;
private String field3;
... + empty constructor  + getter / setter for POJO consideration


We had some troubles with our job, so we rework 2 things :
 - we removed field2 in Buffer class,
 - we stopped using "buffer" state anymore

When restoring with savepoint (--allowNonRestoredState) we have the exception below
The job is submitted to the cluster but fails on checkpointing, job is totally stuck.

Debug:
Debugging showed us some stuff, the exception is raised here (as expected):

public PojoSerializer(
Class<T> clazz,
TypeSerializer<?>[] fieldSerializers,
Field[] fields,
ExecutionConfig executionConfig) {
this.clazz = checkNotNull(clazz);
this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
this.fields = checkNotNull(fields);
this.numFields = fieldSerializers.length;
this.executionConfig = checkNotNull(executionConfig);
for (int i = 0; i < numFields; i++) {
this.fields[i].setAccessible(true); <---- HERE
}


In our fields, we have field[0] & field[2] but field[1] is totally missing from the array, that's why we have the NPE over here, when i=1

So what we have done is to put this state back in our streaming job (with the missing field and POJO), redeploy with old savepoint and this went totally fine
Then we have redeploy a job without this state
This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> remove the state using this POJO)
But the non-used-anymore state is still (at least the serializer) in the savepoints, we could be facing this problem again when we will modify Buffer POJO later.
Finally we just modify a savepoint with API and remove this state once for all, and restart from it.

I have a couple of questions here:
Why does flink keep a non-used state in a savepoint (even if it can not map it into a new topology and allowNonRestoredState is checked ?)
Why does flink not handle this case ? Behaviour seems to be different between an existing POJO state and this non used POJO state
How can I clean my savepoint ? I don't want them to contain non-used state

If anybody has experienced an issue like that before or knows how to handle this, I would be glad to discuss !
Best regards,

------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
Reply | Threaded
Open this post in threaded view
|

Re: Problem when restoring from savepoint with missing state & POJO modification

bastien dine
Hello Yun,
Thank you very much for your response, that's what I thought,
However, it does not seem possible to remove only one state using the state processor API,
We use it a lot, and we can only remove all of the operator states, not one specifically,
Am I missing something?

Best Regards,
Bastien

------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mar. 8 déc. 2020 à 08:54, Yun Tang <[hidden email]> a écrit :
Hi Bastien,

Flink supports to register state via state descriptor when calling runtimeContext.getState(). However, once the state is registered, it cannot be removed anymore. And when you restore from savepoint, the previous state is registered again [1]. Flink does not to drop state directly and you could use state processor API [2] to remove related state.




From: bastien dine <[hidden email]>
Sent: Tuesday, December 8, 2020 0:28
To: user <[hidden email]>
Subject: Problem when restoring from savepoint with missing state & POJO modification
 
Hello,
We have experienced some weird issues with POJO mapState in a streaming job upon checkpointing when removing state, then modifying the state POJO and restoring job

Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)

Reproduced in Flink 1.10 & 1.11
(full stack below)  

Context :
We have a streaming job with a state name "buffer" and POJO Buffer inside a CoFlatMap function

MyCoFlat:
public class MyCoFlat extends RichCoFlatMapFunction<Pojo1, Pojo1, v> {
transient MapState<String, Buffer> buffer;
@Override
public void open(Configuration parameters) {
buffer = getRuntimeContext().getMapState(new MapStateDescriptor<>("buffer", String.class, Buffer.class));
}
....

Buffer :
public class Buffer {
private String field1;
private String field2;
private String field3;
... + empty constructor  + getter / setter for POJO consideration


We had some troubles with our job, so we rework 2 things :
 - we removed field2 in Buffer class,
 - we stopped using "buffer" state anymore

When restoring with savepoint (--allowNonRestoredState) we have the exception below
The job is submitted to the cluster but fails on checkpointing, job is totally stuck.

Debug:
Debugging showed us some stuff, the exception is raised here (as expected):

public PojoSerializer(
Class<T> clazz,
TypeSerializer<?>[] fieldSerializers,
Field[] fields,
ExecutionConfig executionConfig) {
this.clazz = checkNotNull(clazz);
this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
this.fields = checkNotNull(fields);
this.numFields = fieldSerializers.length;
this.executionConfig = checkNotNull(executionConfig);
for (int i = 0; i < numFields; i++) {
this.fields[i].setAccessible(true); <---- HERE
}


In our fields, we have field[0] & field[2] but field[1] is totally missing from the array, that's why we have the NPE over here, when i=1

So what we have done is to put this state back in our streaming job (with the missing field and POJO), redeploy with old savepoint and this went totally fine
Then we have redeploy a job without this state
This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> remove the state using this POJO)
But the non-used-anymore state is still (at least the serializer) in the savepoints, we could be facing this problem again when we will modify Buffer POJO later.
Finally we just modify a savepoint with API and remove this state once for all, and restart from it.

I have a couple of questions here:
Why does flink keep a non-used state in a savepoint (even if it can not map it into a new topology and allowNonRestoredState is checked ?)
Why does flink not handle this case ? Behaviour seems to be different between an existing POJO state and this non used POJO state
How can I clean my savepoint ? I don't want them to contain non-used state

If anybody has experienced an issue like that before or knows how to handle this, I would be glad to discuss !
Best regards,

------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
Reply | Threaded
Open this post in threaded view
|

Re: Problem when restoring from savepoint with missing state & POJO modification

Yun Tang
Hi Bastien,

I think you could refer to WritableSavepoint#write [1] to get all existing state and flat map to remove the state you do not want (could refer to StatePathExtractor[2] )



Best
Yun Tang

From: bastien dine <[hidden email]>
Sent: Wednesday, December 9, 2020 21:17
To: Yun Tang <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Problem when restoring from savepoint with missing state & POJO modification
 
Hello Yun,
Thank you very much for your response, that's what I thought,
However, it does not seem possible to remove only one state using the state processor API,
We use it a lot, and we can only remove all of the operator states, not one specifically,
Am I missing something?

Best Regards,
Bastien

------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mar. 8 déc. 2020 à 08:54, Yun Tang <[hidden email]> a écrit :
Hi Bastien,

Flink supports to register state via state descriptor when calling runtimeContext.getState(). However, once the state is registered, it cannot be removed anymore. And when you restore from savepoint, the previous state is registered again [1]. Flink does not to drop state directly and you could use state processor API [2] to remove related state.




From: bastien dine <[hidden email]>
Sent: Tuesday, December 8, 2020 0:28
To: user <[hidden email]>
Subject: Problem when restoring from savepoint with missing state & POJO modification
 
Hello,
We have experienced some weird issues with POJO mapState in a streaming job upon checkpointing when removing state, then modifying the state POJO and restoring job

Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)

Reproduced in Flink 1.10 & 1.11
(full stack below)  

Context :
We have a streaming job with a state name "buffer" and POJO Buffer inside a CoFlatMap function

MyCoFlat:
public class MyCoFlat extends RichCoFlatMapFunction<Pojo1, Pojo1, v> {
transient MapState<String, Buffer> buffer;
@Override
public void open(Configuration parameters) {
buffer = getRuntimeContext().getMapState(new MapStateDescriptor<>("buffer", String.class, Buffer.class));
}
....

Buffer :
public class Buffer {
private String field1;
private String field2;
private String field3;
... + empty constructor  + getter / setter for POJO consideration


We had some troubles with our job, so we rework 2 things :
 - we removed field2 in Buffer class,
 - we stopped using "buffer" state anymore

When restoring with savepoint (--allowNonRestoredState) we have the exception below
The job is submitted to the cluster but fails on checkpointing, job is totally stuck.

Debug:
Debugging showed us some stuff, the exception is raised here (as expected):

public PojoSerializer(
Class<T> clazz,
TypeSerializer<?>[] fieldSerializers,
Field[] fields,
ExecutionConfig executionConfig) {
this.clazz = checkNotNull(clazz);
this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
this.fields = checkNotNull(fields);
this.numFields = fieldSerializers.length;
this.executionConfig = checkNotNull(executionConfig);
for (int i = 0; i < numFields; i++) {
this.fields[i].setAccessible(true); <---- HERE
}


In our fields, we have field[0] & field[2] but field[1] is totally missing from the array, that's why we have the NPE over here, when i=1

So what we have done is to put this state back in our streaming job (with the missing field and POJO), redeploy with old savepoint and this went totally fine
Then we have redeploy a job without this state
This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> remove the state using this POJO)
But the non-used-anymore state is still (at least the serializer) in the savepoints, we could be facing this problem again when we will modify Buffer POJO later.
Finally we just modify a savepoint with API and remove this state once for all, and restart from it.

I have a couple of questions here:
Why does flink keep a non-used state in a savepoint (even if it can not map it into a new topology and allowNonRestoredState is checked ?)
Why does flink not handle this case ? Behaviour seems to be different between an existing POJO state and this non used POJO state
How can I clean my savepoint ? I don't want them to contain non-used state

If anybody has experienced an issue like that before or knows how to handle this, I would be glad to discuss !
Best regards,

------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
Reply | Threaded
Open this post in threaded view
|

Re: Problem when restoring from savepoint with missing state & POJO modification

sardaesp
I'm having the same issue and I don't understand the explanations. I'm using
Flink 1.9.3 and a KeyedCoProcessFunction. My state's class is still being
used (it's a POJO), but I removed one of its fields. According to the
documentation
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution>
, this should be supported. The  example
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example>  
shows getRuntimeContext().getState() being used in the open() method, and
that's what I do as well.

I get the exact same null pointer exception. If "once the state is
registered, it cannot be removed anymore", then why does the documentation
state otherwise?

Regards,
Alexis.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Problem when restoring from savepoint with missing state & POJO modification

Arvid Heise-4
Hi Alexis,

could you open a new thread and post your exception? It sounds as if it should work, but it's not for some reason.

Did you double check that the PojoSerializer is used?

On Wed, Mar 10, 2021 at 10:27 PM sardaesp <[hidden email]> wrote:
I'm having the same issue and I don't understand the explanations. I'm using
Flink 1.9.3 and a KeyedCoProcessFunction. My state's class is still being
used (it's a POJO), but I removed one of its fields. According to the
documentation
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution>
, this should be supported. The  example
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example
shows getRuntimeContext().getState() being used in the open() method, and
that's what I do as well.

I get the exact same null pointer exception. If "once the state is
registered, it cannot be removed anymore", then why does the documentation
state otherwise?

Regards,
Alexis.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/