Hello everyone! In our planned setup we have 2 data centers, each in different geographic zone (and third for ZK as tie breaker). We use HA with ZooKeeper, as follows: Normally, DC1 will run our job:
But, after DC1 crashes, DC2 will take over, starting Flink processes and resume our job from checkpoint:
For checkpoints, we use filesystem over NAS. And we have NAS in DC1, and replication of it in DC2. The replication is done in background, once a 1 minute. If DC1 crashes, we recover our job in DC2, over its NAS replica
(no zero data loss). Our concern is that when job recovers in DC2, the checkpoint state in its NAS replica will be behind with respect to ZooKeeper’s checkpoint reference. Meaning, ZooKeeper might point to checkpoint x, while in NAS of DC2 it still has only
checkpoint x-1. We hoped that using configuration “state.checkpoints.num-retained: 3” we will be able to solve it. That is – we had 3 latest checkpoints retained, and we tried to simulate such scenario by deleting files of checkpoint 14 (current latest,
besides 13, 12), while leaving reference to checkpoint 14 in ZooKeeper. Our hope was that Flink will failover from that to checkpoint 13, but instead we see that it keeps trying to recover from 14, and failing with error “no such file or directory”. Like that: java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:292) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:224) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) [flink-dist_2.11-1.4.2.jar:1.4.2] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161] Caused by: java.io.FileNotFoundException: /logs/failsafe/checkpoints/3e1de245fb1cf1226aad7351a818be96/chk-14/57d26c1c-df40-4b6f-8046-500eb4c8a0b2 (No such file or directory) at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_161] at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_161] at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_161] at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:70) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:112) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:471) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:446) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) ~[flink-dist_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:282) ~[flink-dist_2.11-1.4.2.jar:1.4.2] ... 6 more BTW, if we delete reference to checkpoint 14 in ZK, job recovers successfully from checkpoint 13. So we could try to automate that, by somehow monitoring & detecting that we fail to recover checkpoint x, so delete it in ZK so x-1 will be
taken. Not great. But maybe you have better ideas for how to deal with such setup with checkpoints replicated between 2 DCs while using ZK cluster for HA stretched over these 2 DCs? Thanks! Shay |
Free forum by Nabble | Edit this page |