Zookeeper connection loss causing checkpoint corruption

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Zookeeper connection loss causing checkpoint corruption

pwestermann

I recently ran into an issue with our Flink cluster: A zookeeper service deploy caused a temporary connection loss and triggered a new jobmanager leader election. Leadership election was successful and our Flink job restarted from the last checkpoint.

This checkpoint appears to have been taken while we los connection to Zookeeper and ended up in a corrupted state so the Flink job kept failing. Here’s the exception stack trace for that:

2020-09-18 01:10:57

java.lang.Exception: Exception while creating StreamOperatorStateContext.

     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)

     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

     at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)

     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)

     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)

     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)

     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)

     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)

     at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from any of the 1 provided restore options.

     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

     ... 9 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.

     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)

     at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

     ... 11 more

Caused by: java.io.IOException: Error while opening RocksDB instance.

     at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)

     at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)

     at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220)

     at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)

     at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)

     at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)

     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:276)

     ... 15 more

Caused by: org.rocksdb.RocksDBException: Sst file size mismatch: /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012554.sst. Size recorded in manifest 5309, actual size 1199

Sst file size mismatch: /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012548.sst. Size recorded in manifest 654588, actual size 1541818

 

     at org.rocksdb.RocksDB.open(Native Method)

     at org.rocksdb.RocksDB.open(RocksDB.java:286)

     at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:77)

     ... 21 more

 

This is for Flink 1.10.2 with Zookeeper for HA, S3 as a state backend and incremental checkpoints. I manually stopped the job and restarted it from the previous checkpoint.

 

This leads me to two questions:

  • Is there a way to avoid corrupted checkpoints or is this just a case of bad timing that we have to live with?
  • Would it be possible to automate the recovery and fall back to a previous checkpoint if a checkpoint cannot be loaded repeatedly?

 

Thanks,

Peter

Reply | Threaded
Open this post in threaded view
|

Re: Zookeeper connection loss causing checkpoint corruption

Arpith P
Hi Peter,

I have recently had a similar issue where I could not load from the checkpoints path. I found that whenever a corrupt checkpoint happens the "_metadata" file will not be persisted, and I've a  program which tracks if checkpoint location based on this strategy and updates DB with location based on timestamp. To restore the latest checkpoint I'm querying DB ordered by latest timestamp. Let me know if this is helpful, I can share code for this if needed.

Arpith

On Mon, Sep 21, 2020 at 6:37 PM Peter Westermann <[hidden email]> wrote:

I recently ran into an issue with our Flink cluster: A zookeeper service deploy caused a temporary connection loss and triggered a new jobmanager leader election. Leadership election was successful and our Flink job restarted from the last checkpoint.

This checkpoint appears to have been taken while we los connection to Zookeeper and ended up in a corrupted state so the Flink job kept failing. Here’s the exception stack trace for that:

2020-09-18 01:10:57

java.lang.Exception: Exception while creating StreamOperatorStateContext.

     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)

     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

     at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)

     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)

     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)

     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)

     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)

     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)

     at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from any of the 1 provided restore options.

     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

     ... 9 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.

     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)

     at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

     ... 11 more

Caused by: java.io.IOException: Error while opening RocksDB instance.

     at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)

     at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)

     at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220)

     at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)

     at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)

     at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)

     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:276)

     ... 15 more

Caused by: org.rocksdb.RocksDBException: Sst file size mismatch: /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012554.sst. Size recorded in manifest 5309, actual size 1199

Sst file size mismatch: /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012548.sst. Size recorded in manifest 654588, actual size 1541818

 

     at org.rocksdb.RocksDB.open(Native Method)

     at org.rocksdb.RocksDB.open(RocksDB.java:286)

     at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:77)

     ... 21 more

 

This is for Flink 1.10.2 with Zookeeper for HA, S3 as a state backend and incremental checkpoints. I manually stopped the job and restarted it from the previous checkpoint.

 

This leads me to two questions:

  • Is there a way to avoid corrupted checkpoints or is this just a case of bad timing that we have to live with?
  • Would it be possible to automate the recovery and fall back to a previous checkpoint if a checkpoint cannot be loaded repeatedly?

 

Thanks,

Peter

Reply | Threaded
Open this post in threaded view
|

Re: Zookeeper connection loss causing checkpoint corruption

Timo Walther
Hi Arpith,

is there a JIRA ticket for this issue already? If not, it would be great
if you can report it. This sounds like a critical priority issue to me.

Thanks,
Timo

On 22.09.20 06:25, Arpith P wrote:

> Hi Peter,
>
> I have recently had a similar issue where I could not load from the
> checkpoints path. I found that whenever a corrupt checkpoint happens the
> "_metadata" file will not be persisted, and I've a  program which tracks
> if checkpoint location based on this strategy and updates DB with
> location based on timestamp. To restore the latest checkpoint I'm
> querying DB ordered by latest timestamp. Let me know if this is helpful,
> I can share code for this if needed.
>
> Arpith
>
> On Mon, Sep 21, 2020 at 6:37 PM Peter Westermann
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     I recently ran into an issue with our Flink cluster: A zookeeper
>     service deploy caused a temporary connection loss and triggered a
>     new jobmanager leader election. Leadership election was successful
>     and our Flink job restarted from the last checkpoint. ____
>
>     This checkpoint appears to have been taken while we los connection
>     to Zookeeper and ended up in a corrupted state so the Flink job kept
>     failing. Here’s the exception stack trace for that:____
>
>     2020-09-18 01:10:57____
>
>     java.lang.Exception: Exception while creating
>     StreamOperatorStateContext.____
>
>           at
>     org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)____
>
>           at
>     org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)____
>
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)____
>
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)____
>
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)____
>
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)____
>
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)____
>
>           at
>     org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)____
>
>           at
>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)____
>
>           at java.lang.Thread.run(Thread.java:748)____
>
>     Caused by: org.apache.flink.util.FlinkException: Could not restore
>     keyed state backend for
>     KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from
>     any of the 1 provided restore options.____
>
>           at
>     org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)____
>
>           at
>     org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)____
>
>           at
>     org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)____
>
>           ... 9 more____
>
>     Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>     Caught unexpected exception.____
>
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)____
>
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)____
>
>           at
>     org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)____
>
>           at
>     org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)____
>
>           at
>     org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)____
>
>           ... 11 more____
>
>     Caused by: java.io.IOException: Error while opening RocksDB
>     instance.____
>
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)____
>
>           at
>     org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)____
>
>           at
>     org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220)____
>
>           at
>     org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)____
>
>           at
>     org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)____
>
>           at
>     org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)____
>
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:276)____
>
>           ... 15 more____
>
>     Caused by: org.rocksdb.RocksDBException: Sst file size mismatch:
>     /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012554.sst.
>     Size recorded in manifest 5309, actual size 1199____
>
>     Sst file size mismatch:
>     /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012548.sst.
>     Size recorded in manifest 654588, actual size 1541818____
>
>     __ __
>
>           at org.rocksdb.RocksDB.open(Native Method)____
>
>           at org.rocksdb.RocksDB.open(RocksDB.java:286)____
>
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:77)____
>
>           ... 21 more____
>
>     __ __
>
>     This is for Flink 1.10.2 with Zookeeper for HA, S3 as a state
>     backend and incremental checkpoints. I manually stopped the job and
>     restarted it from the previous checkpoint. ____
>
>     __ __
>
>     This leads me to two questions:____
>
>       * Is there a way to avoid corrupted checkpoints or is this just a
>         case of bad timing that we have to live with?____
>       * Would it be possible to automate the recovery and fall back to a
>         previous checkpoint if a checkpoint cannot be loaded repeatedly?____
>
>     __ __
>
>     Thanks,____
>
>     Peter____
>

Reply | Threaded
Open this post in threaded view
|

Re: Zookeeper connection loss causing checkpoint corruption

Arpith P
I created a ticket with all my findings. https://issues.apache.org/jira/browse/FLINK-19359.

Thanks,
Arpith

On Tue, Sep 22, 2020 at 12:16 PM Timo Walther <[hidden email]> wrote:
Hi Arpith,

is there a JIRA ticket for this issue already? If not, it would be great
if you can report it. This sounds like a critical priority issue to me.

Thanks,
Timo

On 22.09.20 06:25, Arpith P wrote:
> Hi Peter,
>
> I have recently had a similar issue where I could not load from the
> checkpoints path. I found that whenever a corrupt checkpoint happens the
> "_metadata" file will not be persisted, and I've a  program which tracks
> if checkpoint location based on this strategy and updates DB with
> location based on timestamp. To restore the latest checkpoint I'm
> querying DB ordered by latest timestamp. Let me know if this is helpful,
> I can share code for this if needed.
>
> Arpith
>
> On Mon, Sep 21, 2020 at 6:37 PM Peter Westermann
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     I recently ran into an issue with our Flink cluster: A zookeeper
>     service deploy caused a temporary connection loss and triggered a
>     new jobmanager leader election. Leadership election was successful
>     and our Flink job restarted from the last checkpoint. ____
>
>     This checkpoint appears to have been taken while we los connection
>     to Zookeeper and ended up in a corrupted state so the Flink job kept
>     failing. Here’s the exception stack trace for that:____
>
>     2020-09-18 01:10:57____
>
>     java.lang.Exception: Exception while creating
>     StreamOperatorStateContext.____
>
>           at
>     org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)____
>
>           at
>     org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)____
>
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)____
>
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)____
>
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)____
>
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)____
>
>           at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)____
>
>           at
>     org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)____
>
>           at
>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)____
>
>           at java.lang.Thread.run(Thread.java:748)____
>
>     Caused by: org.apache.flink.util.FlinkException: Could not restore
>     keyed state backend for
>     KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from
>     any of the 1 provided restore options.____
>
>           at
>     org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)____
>
>           at
>     org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)____
>
>           at
>     org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)____
>
>           ... 9 more____
>
>     Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>     Caught unexpected exception.____
>
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)____
>
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)____
>
>           at
>     org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)____
>
>           at
>     org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)____
>
>           at
>     org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)____
>
>           ... 11 more____
>
>     Caused by: java.io.IOException: Error while opening RocksDB
>     instance.____
>
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)____
>
>           at
>     org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)____
>
>           at
>     org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220)____
>
>           at
>     org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)____
>
>           at
>     org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)____
>
>           at
>     org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)____
>
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:276)____
>
>           ... 15 more____
>
>     Caused by: org.rocksdb.RocksDBException: Sst file size mismatch:
>     /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012554.sst.
>     Size recorded in manifest 5309, actual size 1199____
>
>     Sst file size mismatch:
>     /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012548.sst.
>     Size recorded in manifest 654588, actual size 1541818____
>
>     __ __
>
>           at org.rocksdb.RocksDB.open(Native Method)____
>
>           at org.rocksdb.RocksDB.open(RocksDB.java:286)____
>
>           at
>     org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:77)____
>
>           ... 21 more____
>
>     __ __
>
>     This is for Flink 1.10.2 with Zookeeper for HA, S3 as a state
>     backend and incremental checkpoints. I manually stopped the job and
>     restarted it from the previous checkpoint. ____
>
>     __ __
>
>     This leads me to two questions:____
>
>       * Is there a way to avoid corrupted checkpoints or is this just a
>         case of bad timing that we have to live with?____
>       * Would it be possible to automate the recovery and fall back to a
>         previous checkpoint if a checkpoint cannot be loaded repeatedly?____
>
>     __ __
>
>     Thanks,____
>
>     Peter____
>