Hello,
We have experienced some weird issues with POJO mapState in a streaming job upon checkpointing when removing state, then modifying the state POJO and restoring job Caused by: java.lang.NullPointerException Reproduced in Flink 1.10 & 1.11at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186) (full stack below) Context : We have a streaming job with a state name "buffer" and POJO Buffer inside a CoFlatMap function MyCoFlat: public class MyCoFlat extends RichCoFlatMapFunction<Pojo1, Pojo1, v> { transient MapState<String, Buffer> buffer; @Override public void open(Configuration parameters) { buffer = getRuntimeContext().getMapState(new MapStateDescriptor<>("buffer", String.class, Buffer.class)); } .... Buffer : public class Buffer { private String field1; private String field2; private String field3; ... + empty constructor + getter / setter for POJO consideration We had some troubles with our job, so we rework 2 things : - we removed field2 in Buffer class, - we stopped using "buffer" state anymore When restoring with savepoint (--allowNonRestoredState) we have the exception below The job is submitted to the cluster but fails on checkpointing, job is totally stuck. Debug: Debugging showed us some stuff, the exception is raised here (as expected): public PojoSerializer( Class<T> clazz, TypeSerializer<?>[] fieldSerializers, Field[] fields, ExecutionConfig executionConfig) { this.clazz = checkNotNull(clazz); this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers); this.fields = checkNotNull(fields); this.numFields = fieldSerializers.length; this.executionConfig = checkNotNull(executionConfig); for (int i = 0; i < numFields; i++) { this.fields[i].setAccessible(true); <---- HERE } In our fields, we have field[0] & field[2] but field[1] is totally missing from the array, that's why we have the NPE over here, when i=1 So what we have done is to put this state back in our streaming job (with the missing field and POJO), redeploy with old savepoint and this went totally fine Then we have redeploy a job without this state This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> remove the state using this POJO) But the non-used-anymore state is still (at least the serializer) in the savepoints, we could be facing this problem again when we will modify Buffer POJO later. Finally we just modify a savepoint with API and remove this state once for all, and restart from it. I have a couple of questions here: Why does flink keep a non-used state in a savepoint (even if it can not map it into a new topology and allowNonRestoredState is checked ?) Why does flink not handle this case ? Behaviour seems to be different between an existing POJO state and this non used POJO state How can I clean my savepoint ? I don't want them to contain non-used state If anybody has experienced an issue like that before or knows how to handle this, I would be glad to discuss ! Best regards, ------------------ Bastien DINE Data Architect / Software Engineer / Sysadmin |
Hi Bastien,
Flink
supports to register state via state descriptor when calling runtimeContext.getState(). However, once the state is registered, it cannot be removed anymore. And when you restore from savepoint, the previous state is registered again [1]. Flink does not to
drop state directly and you could use state processor API [2] to remove related state.
From: bastien dine <[hidden email]>
Sent: Tuesday, December 8, 2020 0:28 To: user <[hidden email]> Subject: Problem when restoring from savepoint with missing state & POJO modification Hello,
We have experienced some weird issues with POJO mapState in a streaming job upon checkpointing when removing state, then modifying the state POJO and restoring job Caused by: java.lang.NullPointerException
Reproduced in Flink 1.10 & 1.11
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186) (full stack below)
Context : We have a streaming job with a state name "buffer" and POJO Buffer inside a CoFlatMap function MyCoFlat: public class MyCoFlat extends RichCoFlatMapFunction<Pojo1, Pojo1, v> { transient MapState<String, Buffer> buffer; @Override public void open(Configuration parameters) { buffer = getRuntimeContext().getMapState(new MapStateDescriptor<>("buffer", String.class, Buffer.class)); } .... Buffer : public class Buffer {
private String field1; private String field2; private String field3; ... + empty constructor + getter / setter for POJO consideration We had some troubles with our job, so we rework 2 things : - we removed field2 in Buffer class, - we stopped using "buffer" state anymore When restoring with savepoint (--allowNonRestoredState) we have the exception below The job is submitted to the cluster but fails on checkpointing, job is totally stuck. Debug: Debugging showed us some stuff, the exception is raised here (as expected): public PojoSerializer( Class<T> clazz, TypeSerializer<?>[] fieldSerializers, Field[] fields, ExecutionConfig executionConfig) { this.clazz = checkNotNull(clazz); this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers); this.fields = checkNotNull(fields); this.numFields = fieldSerializers.length; this.executionConfig = checkNotNull(executionConfig); for (int i = 0; i < numFields; i++) { this.fields[i].setAccessible(true); <---- HERE } In our fields, we have field[0] & field[2] but field[1] is totally missing from the array, that's why we have the NPE over here, when i=1 So what we have done is to put this state back in our streaming job (with the missing field and POJO), redeploy with old savepoint and this went totally fine Then we have redeploy a job without this state This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> remove the state using this POJO) But the non-used-anymore state is still (at least the serializer) in the savepoints, we could be facing this problem again when we will modify Buffer POJO later. Finally we just modify a savepoint with API and remove this state once for all, and restart from it. I have a couple of questions here: Why does flink keep a non-used state in a savepoint (even if it can not map it into a new topology and allowNonRestoredState is checked ?) Why does flink not handle this case ? Behaviour seems to be different between an existing POJO state and this non used POJO state How can I clean my savepoint ? I don't want them to contain non-used state If anybody has experienced an issue like that before or knows how to handle this, I would be glad to discuss !
Best regards,
------------------
Bastien DINE
Data Architect / Software Engineer / Sysadmin |
Hello Yun, Thank you very much for your response, that's what I thought, However, it does not seem possible to remove only one state using the state processor API, We use it a lot, and we can only remove all of the operator states, not one specifically, Am I missing something? Best Regards, Bastien Le mar. 8 déc. 2020 à 08:54, Yun Tang <[hidden email]> a écrit :
|
Hi Bastien,
I think
you could refer to WritableSavepoint#write [1] to get all existing state and flat map to remove the state you do not want (could refer to StatePathExtractor[2] )
Best
Yun Tang
From: bastien dine <[hidden email]>
Sent: Wednesday, December 9, 2020 21:17 To: Yun Tang <[hidden email]> Cc: user <[hidden email]> Subject: Re: Problem when restoring from savepoint with missing state & POJO modification Hello Yun,
Thank you very much for your response, that's what I thought,
However, it does not seem possible to remove only one state using the state processor API,
We use it a lot, and we can only remove all of the operator states, not one specifically,
Am I missing something?
Best Regards,
Bastien
Le mar. 8 déc. 2020 à 08:54, Yun Tang <[hidden email]> a écrit :
|
I'm having the same issue and I don't understand the explanations. I'm using
Flink 1.9.3 and a KeyedCoProcessFunction. My state's class is still being used (it's a POJO), but I removed one of its fields. According to the documentation <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution> , this should be supported. The example <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example> shows getRuntimeContext().getState() being used in the open() method, and that's what I do as well. I get the exact same null pointer exception. If "once the state is registered, it cannot be removed anymore", then why does the documentation state otherwise? Regards, Alexis. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Alexis, could you open a new thread and post your exception? It sounds as if it should work, but it's not for some reason. Did you double check that the PojoSerializer is used? On Wed, Mar 10, 2021 at 10:27 PM sardaesp <[hidden email]> wrote: I'm having the same issue and I don't understand the explanations. I'm using |
Free forum by Nabble | Edit this page |