Hello, We are trying to update our kafka connector dependency. So far we have been using flink-connector-kafka-0.11 and we would like to update the dependency to flink-connector-kafka. However, when I try to restart the job with a savepoint I get the following exception: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSink_351727121bb1ca0d704092960989d25b_(1/10) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:255) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143) ... 5 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:537) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 7 more Caused by: java.io.IOException: Could not find class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$NextTransactionalIdHint' in classpath. at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:711) at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:681) at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:178) at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:122) at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:125) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:170) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ... 11 more Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$NextTransactionalIdHint at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:708) ... 23 more It seems like the state is saved including the classes. Is it possible to do a migration in some way where we can update the dependency and keep the state? Regards ,Nikola Hrusov |
Hi Nikola, If I remember correctly, state is not compatible between flink-connector-kafka-0.11 and the universal flink-connector-kafka. Piotr (cc'ed) would probably know whats going on here. Cheers, Gordon On Mon, Aug 10, 2020 at 1:07 PM Nikola Hrusov <[hidden email]> wrote:
|
Hi Nikola, Which Flink version are you using? Can you describe step by step what you are doing? This error that you have should have been fixed in Flink 1.9.0+ [1], so if you are using an older version of Flink, please first upgrade Flink - without upgrading the job, then upgrade the connector. Piotrek pon., 10 sie 2020 o 08:14 Tzu-Li (Gordon) Tai <[hidden email]> napisał(a):
|
(adding back user mailing list) 1. Upgrade Flink to 1.11.1 without upgrading the connector 2. Take a new savepoint 3. Upgrade connector to the universal one 4. Restore upgraded job from the new savepoint (2) If it doesn't work, please let us know. Piotrek śr., 12 sie 2020 o 09:53 Nikola Hrusov <[hidden email]> napisał(a):
|
Free forum by Nabble | Edit this page |