When testing a job on Flink 1.8, we hit the following error during
resuming from RocksDB checkpoint. This job has been working well on Flink 1.6.1. The checkpoint was taken using the exact same job on 1.8. The operator StreamMap_3c5866a6cc097b462de842b2ef91910d it mentioned in the error message is a RichFlatMapper with TTL state. Why would "_timer_state" appear in a RichFlatMapper that doesn't use timer? How should we diagnose or resolve this error? 2019-04-21 01:55:08,616 WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for StreamMap_3c5866a6cc097b462de842b2ef91910d_(11/36) from alternative (1/1), will retry while more alternatives are available. org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:323) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Error while opening RocksDB instance. at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74) at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:267) ... 11 more Caused by: org.rocksdb.RocksDBException: You have to open all column families. Column families not opened: _timer_state/event_user-timers, _timer_state/processing_user-timers at org.rocksdb.RocksDB.open(Native Method) at org.rocksdb.RocksDB.open(RocksDB.java:286) at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:66) ... 17 more Thanks, Ning |
For clarification, one of the operators in the chain mentioned in the
error message is a KeyedBroadcastProcessFunction, which I believe creates an InternalTimerService implicitly. That might be why "_timer_state" appears in this operator chain. However, it is still a mystery to me why it worked in Flink 1.6 but not in Flink 1.8. Any insights would be appreciated. Ning On Sat, Apr 20, 2019 at 10:28 PM Ning Shi <[hidden email]> wrote: > > When testing a job on Flink 1.8, we hit the following error during > resuming from RocksDB checkpoint. This job has been working well on > Flink 1.6.1. The checkpoint was taken using the exact same job on 1.8. > The operator StreamMap_3c5866a6cc097b462de842b2ef91910d it mentioned > in the error message is a RichFlatMapper with TTL state. Why would > "_timer_state" appear in a RichFlatMapper that doesn't use timer? How > should we diagnose or resolve this error? > > 2019-04-21 01:55:08,616 WARN > org.apache.flink.streaming.api.operators.BackendRestorerProcedure - > Exception while restoring keyed state backend for > StreamMap_3c5866a6cc097b462de842b2ef91910d_(11/36) from alternative > (1/1), will retry while more alternatives are available. > org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:323) > at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520) > at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) > at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) > at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Error while opening RocksDB instance. > at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74) > at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131) > at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214) > at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188) > at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162) > at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:267) > ... 11 more > Caused by: org.rocksdb.RocksDBException: You have to open all column > families. Column families not opened: _timer_state/event_user-timers, > _timer_state/processing_user-timers > at org.rocksdb.RocksDB.open(Native Method) > at org.rocksdb.RocksDB.open(RocksDB.java:286) > at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:66) > ... 17 more > > Thanks, > > Ning |
Hi, From the given error message, this seems flink can't open RocksDB because of the number of column family mismatch, do you mind sharing a minimum job which can reproduce this problem? Best, Congxian Ning Shi <[hidden email]> 于2019年4月21日周日 上午10:56写道: For clarification, one of the operators in the chain mentioned in the |
Congxian,
Thanks for the reply. I will try to get a minimum reproducer and post it to this thread soon. Ning On Sun, 21 Apr 2019 09:27:12 -0400, Congxian Qiu wrote: > > Hi, > From the given error message, this seems flink can't open RocksDB because > of the number of column family mismatch, do you mind sharing a minimum job > which can reproduce this problem? > > Best, > Congxian |
In reply to this post by Congxian Qiu
Hi Congxian,
I think I have figured out the issue. It's related to the checkpoint directory collision issue you responded to in the other thread. We reproduced this bug on 1.6.1 after unchaining the operators. There are two stateful operators in the chain, one is a CoBroadcastWithKeyedOperator, the other one is a StreamMapper. The CoBroadcastWithKeyedOperator creates timer states in RocksDB, the latter doesn’t. Because of the checkpoint directory collision bug, we always end up saving the states for CoBroadcastWithKeyedOperator. After breaking these two operators apart, they try to restore from the same set of saved states. When the StreamMapper opens the RocksDB files, it doesn’t care about any of the column families in there, including the timer states. Hence the error. -- Ning |
For future reference here is a cross link to the referred ML thread discussion [1]. [1] http://mail-archives.apache.org/mod_mbox/flink-user/201904.mbox/%3Cm2ef5tpfwy.wl-ningshi2@...%3E Cheers, Till On Wed, Apr 24, 2019 at 4:00 AM Ning Shi <[hidden email]> wrote: Hi Congxian, |
Free forum by Nabble | Edit this page |