Error restoring from checkpoint on Flink 1.8

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Error restoring from checkpoint on Flink 1.8

Ning Shi
When testing a job on Flink 1.8, we hit the following error during
resuming from RocksDB checkpoint. This job has been working well on
Flink 1.6.1. The checkpoint was taken using the exact same job on 1.8.
The operator StreamMap_3c5866a6cc097b462de842b2ef91910d it mentioned
in the error message is a RichFlatMapper with TTL state. Why would
"_timer_state" appear in a RichFlatMapper that doesn't use timer? How
should we diagnose or resolve this error?

2019-04-21 01:55:08,616 WARN
org.apache.flink.streaming.api.operators.BackendRestorerProcedure  -
Exception while restoring keyed state backend for
StreamMap_3c5866a6cc097b462de842b2ef91910d_(11/36) from alternative
(1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught
unexpected exception.
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:323)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error while opening RocksDB instance.
        at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74)
        at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214)
        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:267)
        ... 11 more
Caused by: org.rocksdb.RocksDBException: You have to open all column
families. Column families not opened: _timer_state/event_user-timers,
_timer_state/processing_user-timers
        at org.rocksdb.RocksDB.open(Native Method)
        at org.rocksdb.RocksDB.open(RocksDB.java:286)
        at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:66)
        ... 17 more

Thanks,

Ning
Reply | Threaded
Open this post in threaded view
|

Re: Error restoring from checkpoint on Flink 1.8

Ning Shi
For clarification, one of the operators in the chain mentioned in the
error message is a KeyedBroadcastProcessFunction, which I believe
creates an InternalTimerService implicitly. That might be why
"_timer_state" appears in this operator chain. However, it is still a
mystery to me why it worked in Flink 1.6 but not in Flink 1.8. Any
insights would be appreciated.

Ning

On Sat, Apr 20, 2019 at 10:28 PM Ning Shi <[hidden email]> wrote:

>
> When testing a job on Flink 1.8, we hit the following error during
> resuming from RocksDB checkpoint. This job has been working well on
> Flink 1.6.1. The checkpoint was taken using the exact same job on 1.8.
> The operator StreamMap_3c5866a6cc097b462de842b2ef91910d it mentioned
> in the error message is a RichFlatMapper with TTL state. Why would
> "_timer_state" appear in a RichFlatMapper that doesn't use timer? How
> should we diagnose or resolve this error?
>
> 2019-04-21 01:55:08,616 WARN
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure  -
> Exception while restoring keyed state backend for
> StreamMap_3c5866a6cc097b462de842b2ef91910d_(11/36) from alternative
> (1/1), will retry while more alternatives are available.
> org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:323)
>         at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
>         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
>         at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>         at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
>         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Error while opening RocksDB instance.
>         at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74)
>         at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
>         at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214)
>         at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
>         at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
>         at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:267)
>         ... 11 more
> Caused by: org.rocksdb.RocksDBException: You have to open all column
> families. Column families not opened: _timer_state/event_user-timers,
> _timer_state/processing_user-timers
>         at org.rocksdb.RocksDB.open(Native Method)
>         at org.rocksdb.RocksDB.open(RocksDB.java:286)
>         at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:66)
>         ... 17 more
>
> Thanks,
>
> Ning
Reply | Threaded
Open this post in threaded view
|

Re: Error restoring from checkpoint on Flink 1.8

Congxian Qiu
Hi,
From the given error message, this seems flink can't open RocksDB because of the number of column family mismatch, do you mind sharing a minimum job which can reproduce this problem?

Best,
Congxian


Ning Shi <[hidden email]> 于2019年4月21日周日 上午10:56写道:
For clarification, one of the operators in the chain mentioned in the
error message is a KeyedBroadcastProcessFunction, which I believe
creates an InternalTimerService implicitly. That might be why
"_timer_state" appears in this operator chain. However, it is still a
mystery to me why it worked in Flink 1.6 but not in Flink 1.8. Any
insights would be appreciated.

Ning

On Sat, Apr 20, 2019 at 10:28 PM Ning Shi <[hidden email]> wrote:
>
> When testing a job on Flink 1.8, we hit the following error during
> resuming from RocksDB checkpoint. This job has been working well on
> Flink 1.6.1. The checkpoint was taken using the exact same job on 1.8.
> The operator StreamMap_3c5866a6cc097b462de842b2ef91910d it mentioned
> in the error message is a RichFlatMapper with TTL state. Why would
> "_timer_state" appear in a RichFlatMapper that doesn't use timer? How
> should we diagnose or resolve this error?
>
> 2019-04-21 01:55:08,616 WARN
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure  -
> Exception while restoring keyed state backend for
> StreamMap_3c5866a6cc097b462de842b2ef91910d_(11/36) from alternative
> (1/1), will retry while more alternatives are available.
> org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:323)
>         at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
>         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
>         at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>         at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
>         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Error while opening RocksDB instance.
>         at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74)
>         at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
>         at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214)
>         at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
>         at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
>         at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:267)
>         ... 11 more
> Caused by: org.rocksdb.RocksDBException: You have to open all column
> families. Column families not opened: _timer_state/event_user-timers,
> _timer_state/processing_user-timers
>         at org.rocksdb.RocksDB.open(Native Method)
>         at org.rocksdb.RocksDB.open(RocksDB.java:286)
>         at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:66)
>         ... 17 more
>
> Thanks,
>
> Ning
Reply | Threaded
Open this post in threaded view
|

Re: Error restoring from checkpoint on Flink 1.8

Ning Shi
Congxian,

Thanks for the reply. I will try to get a minimum reproducer and post it to this
thread soon.

Ning

On Sun, 21 Apr 2019 09:27:12 -0400,
Congxian Qiu wrote:
>
> Hi,
> From the given error message, this seems flink can't open RocksDB because
> of the number of column family mismatch, do you mind sharing a minimum job
> which can reproduce this problem?
>
> Best,
> Congxian
Reply | Threaded
Open this post in threaded view
|

Re: Error restoring from checkpoint on Flink 1.8

Ning Shi
In reply to this post by Congxian Qiu
Hi Congxian,

I think I have figured out the issue. It's related to the checkpoint directory
collision issue you responded to in the other thread. We reproduced this bug on
1.6.1 after unchaining the operators.

There are two stateful operators in the chain, one is a
CoBroadcastWithKeyedOperator, the other one is a StreamMapper. The
CoBroadcastWithKeyedOperator creates timer states in RocksDB, the latter
doesn’t. Because of the checkpoint directory collision bug, we always end up
saving the states for CoBroadcastWithKeyedOperator.

After breaking these two operators apart, they try to restore from the same set
of saved states. When the StreamMapper opens the RocksDB files, it doesn’t care
about any of the column families in there, including the timer states. Hence the
error.

--
Ning
Reply | Threaded
Open this post in threaded view
|

Re: Error restoring from checkpoint on Flink 1.8

Till Rohrmann
For future reference here is a cross link to the referred ML thread discussion [1].


Cheers,
Till

On Wed, Apr 24, 2019 at 4:00 AM Ning Shi <[hidden email]> wrote:
Hi Congxian,

I think I have figured out the issue. It's related to the checkpoint directory
collision issue you responded to in the other thread. We reproduced this bug on
1.6.1 after unchaining the operators.

There are two stateful operators in the chain, one is a
CoBroadcastWithKeyedOperator, the other one is a StreamMapper. The
CoBroadcastWithKeyedOperator creates timer states in RocksDB, the latter
doesn’t. Because of the checkpoint directory collision bug, we always end up
saving the states for CoBroadcastWithKeyedOperator.

After breaking these two operators apart, they try to restore from the same set
of saved states. When the StreamMapper opens the RocksDB files, it doesn’t care
about any of the column families in there, including the timer states. Hence the
error.

--
Ning