We have a Flink job using RocksDB state backend. We found that one of the
RichMapFunction state was not being saved in checkpoints or savepoints. After some digging, it seems that two operators in the same operator chain are colliding with each other during checkpoint or savepoint, resulting in one of the operator's state to be missing. I extracted all the checkpoint directory for all operators from the RocksDB LOG files for one of the checkpoints. As you can see, the StreamMap operator shared the same checkpoint directory with the CoBroadcastWithKeyedOperator. They are in the same operator chain. /var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_1/chk_21244/rocks_db CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__2_90__ /var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_53/chk_21244/rocks_db CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__54_90__ /var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_53/chk_21244/rocks_db StreamMap_3c5866a6cc097b462de842b2ef91910d__54_90__ /var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_77/chk_21244/rocks_db WindowOperator_bc2936094388a70852534bd6c0fce178__78_90__ /var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_84/chk_21244/rocks_db CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__85_90__ /var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_66/chk_21244/rocks_db CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__67_90__ /var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_66/chk_21244/rocks_db StreamMap_3c5866a6cc097b462de842b2ef91910d__67_90__ /var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_15/chk_21244/rocks_db WindowOperator_bc2936094388a70852534bd6c0fce178__16_90__ /var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_53/chk_21244/rocks_db CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__54_90__ /var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_1/chk_21244/rocks_db CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__2_90__ /var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_1/chk_21244/rocks_db StreamMap_3c5866a6cc097b462de842b2ef91910d__2_90__ /var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_56/chk_21244/rocks_db WindowOperator_bc2936094388a70852534bd6c0fce178__57_90__ /var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_30/chk_21244/rocks_db CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__31_90__ /var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_46/chk_21244/rocks_db CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__47_90__ /var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_46/chk_21244/rocks_db StreamMap_3c5866a6cc097b462de842b2ef91910d__47_90__ /var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_12/chk_21244/rocks_db WindowOperator_bc2936094388a70852534bd6c0fce178__13_90__ /var/flink/data/localState/aid_AllocationID{f6241daa33001250c3f2934a8ba6b506}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_46/chk_21244/rocks_db CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__47_90__ /var/flink/data/localState/aid_AllocationID{f6241daa33001250c3f2934a8ba6b506}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_30/chk_21244/rocks_db CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__31_90__ /var/flink/data/localState/aid_AllocationID{f6241daa33001250c3f2934a8ba6b506}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_30/chk_21244/rocks_db StreamMap_3c5866a6cc097b462de842b2ef91910d__31_90__ /var/flink/data/localState/aid_AllocationID{f6241daa33001250c3f2934a8ba6b506}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_79/chk_21244/rocks_db WindowOperator_bc2936094388a70852534bd6c0fce178__80_90__ /var/flink/data/localState/aid_AllocationID{fbf10f2769de14f7328de6aa3c056515}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_66/chk_21244/rocks_db CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__67_90__ /var/flink/data/localState/aid_AllocationID{fbf10f2769de14f7328de6aa3c056515}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_84/chk_21244/rocks_db CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__85_90__ /var/flink/data/localState/aid_AllocationID{fbf10f2769de14f7328de6aa3c056515}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_84/chk_21244/rocks_db StreamMap_3c5866a6cc097b462de842b2ef91910d__85_90__ /var/flink/data/localState/aid_AllocationID{fbf10f2769de14f7328de6aa3c056515}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_58/chk_21244/rocks_db WindowOperator_bc2936094388a70852534bd6c0fce178__59_90__ After each checkpoint, when I checked the checkpoint directory for the StreamMap operator state, the SST files are not there. Restoring a new job from the same checkpoint or savepoint also confirmed that the StreamMap states were missing, but with no error reported by Flink. I also used strace to capture file I/O during checkpoints. I could see that the StreamMap operator succeeded in creating the checkpoint directory, but immediately after that it received a lot of "-1 ENOENT (No such file or directory)" errors, possibly because the directory was over-written by the other operator. Is this an known issue? It seems that the UUID generation of chained operators are not differentiating the two operators, resulting in data loss. Thanks, Ning |
Hi, Ning
From the log message you given, the two operate share the same directory, and when snapshot, the directory will be deleted first if it exists(RocksIncrementalSnapshotStrategy#prepareLocalSnapshotDirectory).
I did not find an issue for this problem, and I don’t thinks this is a problem of UUID generation problem, please check the path generation logic in LocalRecoveryDirectoryProviderImpl#subtaskSpecificCheckpointDirectory.
I’ve created an issue for this problem.
Best, Congxian
On Apr 23, 2019, 11:19 +0800, Ning Shi <[hidden email]>, wrote:
We have a Flink job using RocksDB state backend. We found that one of the |
Congxian,
Thank you for creating the ticket and providing the relevant code. I’m curious why you don’t think the directory collision is not a problem. What we observe is that one of the operator states are not included in the checkpoint and data is lost on restore. That’s a pretty serious problem especially when Flink doesn’t generate any error in the log. People could be losing states silently potentially. Please let me know how I can best help diagnose this issue and drive the ticket forward. I’m happy to collect any relevant information. Thanks, — Ning > On Apr 23, 2019, at 2:54 AM, Congxian Qiu <[hidden email]> wrote: > > From the log message you given, the two operate share the same directory, and when snapshot, the directory will be deleted first if it exists(RocksIncrementalSnapshotStrategy#prepareLocalSnapshotDirectory). > > I did not find an issue for this problem, and I don’t thinks this is a problem of UUID generation problem, please check the path generation logic in LocalRecoveryDirectoryProviderImpl#subtaskSpecificCheckpointDirectory. > > I’ve created an issue for this problem. |
Congxian,
We just did a test. Separating the two stateful operators from chaining seems to have worked around the problem. The states for both of them are successfully saved in the checkpoint. Ning On Tue, Apr 23, 2019 at 7:41 AM Ning Shi <[hidden email]> wrote: > > Congxian, > > Thank you for creating the ticket and providing the relevant code. I’m curious why you don’t think the directory collision is not a problem. What we observe is that one of the operator states are not included in the checkpoint and data is lost on restore. That’s a pretty serious problem especially when Flink doesn’t generate any error in the log. People could be losing states silently potentially. > > Please let me know how I can best help diagnose this issue and drive the ticket forward. I’m happy to collect any relevant information. > > Thanks, > > — > Ning > > > On Apr 23, 2019, at 2:54 AM, Congxian Qiu <[hidden email]> wrote: > > > > From the log message you given, the two operate share the same directory, and when snapshot, the directory will be deleted first if it exists(RocksIncrementalSnapshotStrategy#prepareLocalSnapshotDirectory). > > > > I did not find an issue for this problem, and I don’t thinks this is a problem of UUID generation problem, please check the path generation logic in LocalRecoveryDirectoryProviderImpl#subtaskSpecificCheckpointDirectory. > > > > I’ve created an issue for this problem. |
In reply to this post by Ning Shi
Hi Ning,
Sorry for the misleading, in the previous email, I just want to say the problem is not caused by the UUID generation, it is caused by the different operators share the same directory(because currentlyFlink uses JobVertx as the directory)
Best, Congxian
On Apr 23, 2019, 19:41 +0800, Ning Shi <[hidden email]>, wrote:
Congxian, |
On Tue, 23 Apr 2019 10:53:52 -0400,
Congxian Qiu wrote: > Sorry for the misleading, in the previous email, I just want to say the problem is not caused by the UUID generation, it is caused by the different operators share the same directory(because currentlyFlink uses JobVertx as the directory) Ah, thank you for the clarification, Congxian. That makes sense. Ning |
Thanks for reporting this issue Ning. I think this is actually a blocker for the next release and should be fixed right away. For future reference here is the issue [1]. I've also pulled in Stefan who knows these components very well. Cheers, Till On Tue, Apr 23, 2019 at 5:24 PM Ning Shi <[hidden email]> wrote: On Tue, 23 Apr 2019 10:53:52 -0400, |
Till,
Thank you for escalating this to blocker. I agree that data loss is always a serious issue. For reference, the workaround is to unchain the stateful operators. To make the new job be able to recover from previous checkpoint, we also had to change the UID of the operator that was missing state and recover with allow non-restored argument. Otherwise, it would fail with RocksDB errors on restore. — Ning
|
Free forum by Nabble | Edit this page |