Hi,
We are running Flink 1.7 and recently due to Kafka cluster migration, we need to find a way to modify kafka offset in FlinkKafkaConnector's state, and we found Flink 1.9's State Processor API is the exactly tool we need, we are able to modify the operator state via State Processor API, but when trying to resume App from the modified savepoint, we found it failed with ClassNotFoundException: TupleSerializerSnapshot, these TypeSerializerSnapshots are new in Flink 1.9 but not in 1.7, so I wonder if there has any suggestion or workaround to modify 1.7's state? Thanks & Regards Zhao Kaihao |
Hi Kaihao, Ping [hidden email] [hidden email] to give more professional suggestions. What's more, we may need to give a statement about if the state processor API can process the snapshots generated by the old version jobs. WDYT? Best, Vino Kaihao Zhao <[hidden email]> 于2019年11月25日周一 下午11:39写道:
|
Hi Vino/Seth, After a little dig into flink source code, I thought I found a workaround: 1. I modified flink-core 1.9's code in our migration service to skip the Snapshot Compability Check and use TupleSerializerConfigSnapshot/KryoSerializerConfigSnapshot instead of TupleSerializerSnapshot/KryoSerializerSnapshot 2. Add Flink Proc Api lib to production Flink clusters' lib Then it works, 1.7 Apps can resume from the modified savepoint. Here is the code change I made: https://github.com/mcgG/flink/commit/47661f97ad5ab30ee948805600c1d7f372ae85b2 I will be appreciate if you could give more suggestions. Another interesting thing we met during the modification of the savepoint is, current flink proc api only supports modify state by UID, but we found some of our users are not using UID(I know it's not a good practice). For this case, KafkaConnector has the keyed state named kafka-partition-offset-states, I can get it's operatorID and then hack into the internal data structure to modify the operatorStateIndex, it's a little tricky. So I suggest maybe we could overload some new method like: public void addOperator(bytep[] operatorID, BootstrapTransformation<?> transformation) to make this kind of non-UID case easier. Best, Kaihao Zhao On Tue, Nov 26, 2019 at 10:29 AM Seth Wiesman <[hidden email]> wrote:
Zhao Kaihao
|
Free forum by Nabble | Edit this page |