Question about to modify operator state on Flink1.7 with State Processor API

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about to modify operator state on Flink1.7 with State Processor API

Kaihao Zhao
Hi,

We are running Flink 1.7 and recently due to Kafka cluster migration, we need to find a way to modify kafka offset in FlinkKafkaConnector's state, and we found Flink 1.9's State Processor API is the exactly tool we need, we are able to modify the operator state via State Processor API, but when trying to resume App from the modified savepoint, we found it failed with ClassNotFoundException: TupleSerializerSnapshot, these TypeSerializerSnapshots are new in Flink 1.9 but not in 1.7, so I wonder if there has any suggestion or workaround to modify 1.7's state?

-- 
Thanks & Regards
Zhao Kaihao
Reply | Threaded
Open this post in threaded view
|

Re: Question about to modify operator state on Flink1.7 with State Processor API

vino yang
Hi Kaihao,

Ping [hidden email] [hidden email] to give more professional suggestions.

What's more, we may need to give a statement about if the state processor API can process the snapshots generated by the old version jobs.  WDYT?

Best,
Vino

Kaihao Zhao <[hidden email]> 于2019年11月25日周一 下午11:39写道:
Hi,

We are running Flink 1.7 and recently due to Kafka cluster migration, we need to find a way to modify kafka offset in FlinkKafkaConnector's state, and we found Flink 1.9's State Processor API is the exactly tool we need, we are able to modify the operator state via State Processor API, but when trying to resume App from the modified savepoint, we found it failed with ClassNotFoundException: TupleSerializerSnapshot, these TypeSerializerSnapshots are new in Flink 1.9 but not in 1.7, so I wonder if there has any suggestion or workaround to modify 1.7's state?

-- 
Thanks & Regards
Zhao Kaihao
Reply | Threaded
Open this post in threaded view
|

Re: Question about to modify operator state on Flink1.7 with State Processor API

Kaihao Zhao
Hi Vino/Seth,

Thanks Vino and Seth, changing the UID and setting offset manually is a solution, but the pin point is we have tons of applications(owned by other users) running on our platform, so it will be inefficient to do it manually, and the most difficult part is to let users to change their code. 
After a little dig into flink source code, I thought I found a workaround:
1. I modified flink-core 1.9's code in our migration service to skip the Snapshot Compability Check and use TupleSerializerConfigSnapshot/KryoSerializerConfigSnapshot instead of TupleSerializerSnapshot/KryoSerializerSnapshot
2. Add Flink Proc Api lib to production Flink clusters' lib
Then it works, 1.7 Apps can resume from the modified savepoint.
I will be appreciate if you could give more suggestions.

Another interesting thing we met during the modification of the savepoint is, current flink proc api only supports modify state by UID, but we found some of our users are not using UID(I know it's not a good practice). For this case, KafkaConnector has the keyed state named kafka-partition-offset-states, I can get it's operatorID and then hack into the internal data structure to modify the operatorStateIndex, it's a little tricky. So I suggest maybe we could overload some new method like: public void addOperator(bytep[] operatorID, BootstrapTransformation<?> transformation) to make this kind of non-UID case easier.

Best,
Kaihao Zhao

On Tue, Nov 26, 2019 at 10:29 AM Seth Wiesman <[hidden email]> wrote:
The state proc api makes the same guarantees around savepoint compatibility as the rest of Flink. It is backwards compatible up to 3 versions but there are no guarantees around forwards compatibility. A 1.9 savepoint is not guaranteed to be resumable on a 1.7 cluster. 

That being said, the state proc api is overkill for this situation.

All you need to do is change to UID on your source operator and set your offsets when configuring the source[1, 2]. When resuming your job pass the —allowNonRestoredState flag and your offsets will reset but all other state will be retained.


On Mon, Nov 25, 2019 at 6:02 PM vino yang <[hidden email]> wrote:
Hi Kaihao,

Ping [hidden email] [hidden email] to give more professional suggestions.

What's more, we may need to give a statement about if the state processor API can process the snapshots generated by the old version jobs.  WDYT?

Best,
Vino

Kaihao Zhao <[hidden email]> 于2019年11月25日周一 下午11:39写道:
Hi,

We are running Flink 1.7 and recently due to Kafka cluster migration, we need to find a way to modify kafka offset in FlinkKafkaConnector's state, and we found Flink 1.9's State Processor API is the exactly tool we need, we are able to modify the operator state via State Processor API, but when trying to resume App from the modified savepoint, we found it failed with ClassNotFoundException: TupleSerializerSnapshot, these TypeSerializerSnapshots are new in Flink 1.9 but not in 1.7, so I wonder if there has any suggestion or workaround to modify 1.7's state?

-- 
Thanks & Regards
Zhao Kaihao
--

Seth Wiesman | Solutions Architect

+1 314 387 1463



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


--
Zhao Kaihao