I recently ran into an issue with our Flink cluster: A zookeeper service deploy caused a temporary connection loss and triggered a new jobmanager leader election. Leadership election was successful and our
Flink job restarted from the last checkpoint. This checkpoint appears to have been taken while we los connection to Zookeeper and ended up in a corrupted state so the Flink job kept failing. Here’s the exception stack trace for that: 2020-09-18 01:10:57 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from any of the 1 provided
restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io.IOException: Error while opening RocksDB instance. at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89) at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:276) ... 15 more Caused by: org.rocksdb.RocksDBException: Sst file size mismatch: /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012554.sst.
Size recorded in manifest 5309, actual size 1199 Sst file size mismatch: /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012548.sst.
Size recorded in manifest 654588, actual size 1541818 at org.rocksdb.RocksDB.open(Native Method) at org.rocksdb.RocksDB.open(RocksDB.java:286) at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:77) ... 21 more This is for Flink 1.10.2 with Zookeeper for HA, S3 as a state backend and incremental checkpoints. I manually stopped the job and restarted it from the previous checkpoint.
This leads me to two questions:
Thanks, Peter |
Hi Peter, I have recently had a similar issue where I could not load from the checkpoints path. I found that whenever a corrupt checkpoint happens the "_metadata" file will not be persisted, and I've a program which tracks if checkpoint location based on this strategy and updates DB with location based on timestamp. To restore the latest checkpoint I'm querying DB ordered by latest timestamp. Let me know if this is helpful, I can share code for this if needed. Arpith On Mon, Sep 21, 2020 at 6:37 PM Peter Westermann <[hidden email]> wrote:
|
Hi Arpith,
is there a JIRA ticket for this issue already? If not, it would be great if you can report it. This sounds like a critical priority issue to me. Thanks, Timo On 22.09.20 06:25, Arpith P wrote: > Hi Peter, > > I have recently had a similar issue where I could not load from the > checkpoints path. I found that whenever a corrupt checkpoint happens the > "_metadata" file will not be persisted, and I've a program which tracks > if checkpoint location based on this strategy and updates DB with > location based on timestamp. To restore the latest checkpoint I'm > querying DB ordered by latest timestamp. Let me know if this is helpful, > I can share code for this if needed. > > Arpith > > On Mon, Sep 21, 2020 at 6:37 PM Peter Westermann > <[hidden email] <mailto:[hidden email]>> wrote: > > I recently ran into an issue with our Flink cluster: A zookeeper > service deploy caused a temporary connection loss and triggered a > new jobmanager leader election. Leadership election was successful > and our Flink job restarted from the last checkpoint. ____ > > This checkpoint appears to have been taken while we los connection > to Zookeeper and ended up in a corrupted state so the Flink job kept > failing. Here’s the exception stack trace for that:____ > > 2020-09-18 01:10:57____ > > java.lang.Exception: Exception while creating > StreamOperatorStateContext.____ > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)____ > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)____ > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)____ > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)____ > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)____ > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)____ > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)____ > > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)____ > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)____ > > at java.lang.Thread.run(Thread.java:748)____ > > Caused by: org.apache.flink.util.FlinkException: Could not restore > keyed state backend for > KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from > any of the 1 provided restore options.____ > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)____ > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)____ > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)____ > > ... 9 more____ > > Caused by: org.apache.flink.runtime.state.BackendBuildingException: > Caught unexpected exception.____ > > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)____ > > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)____ > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)____ > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)____ > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)____ > > ... 11 more____ > > Caused by: java.io.IOException: Error while opening RocksDB > instance.____ > > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)____ > > at > org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)____ > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220)____ > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)____ > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)____ > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)____ > > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:276)____ > > ... 15 more____ > > Caused by: org.rocksdb.RocksDBException: Sst file size mismatch: > /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012554.sst. > Size recorded in manifest 5309, actual size 1199____ > > Sst file size mismatch: > /mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012548.sst. > Size recorded in manifest 654588, actual size 1541818____ > > __ __ > > at org.rocksdb.RocksDB.open(Native Method)____ > > at org.rocksdb.RocksDB.open(RocksDB.java:286)____ > > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:77)____ > > ... 21 more____ > > __ __ > > This is for Flink 1.10.2 with Zookeeper for HA, S3 as a state > backend and incremental checkpoints. I manually stopped the job and > restarted it from the previous checkpoint. ____ > > __ __ > > This leads me to two questions:____ > > * Is there a way to avoid corrupted checkpoints or is this just a > case of bad timing that we have to live with?____ > * Would it be possible to automate the recovery and fall back to a > previous checkpoint if a checkpoint cannot be loaded repeatedly?____ > > __ __ > > Thanks,____ > > Peter____ > |
I created a ticket with all my findings. https://issues.apache.org/jira/browse/FLINK-19359. Thanks, Arpith On Tue, Sep 22, 2020 at 12:16 PM Timo Walther <[hidden email]> wrote: Hi Arpith, |
Free forum by Nabble | Edit this page |