Checkpoints replication over data centers synchronized with ZooKeeper in HA mode

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Checkpoints replication over data centers synchronized with ZooKeeper in HA mode

Shimony, Shay

Hello everyone!

 

In our planned setup we have 2 data centers, each in different geographic zone (and third for ZK as tie breaker). We use HA with ZooKeeper, as follows:

 

Normally, DC1 will run our job:

 

DC1

DC2

DC3

Machine 1

Machine 2

Machine 3

Machine 4

Machine 5

ZK1

ZK2

ZK3

ZK4

ZK5

JM1 (leader)

JM2

 

 

 

TM1

TM2

 

 

 

 

But, after DC1 crashes, DC2 will take over, starting Flink processes and resume our job from checkpoint:

 

DC1

DC2

DC3

Machine 1

Machine 2

Machine 3

Machine 4

Machine 5

 

 

ZK3

ZK4

ZK5

 

 

JM3 (leader)

JM4

 

 

 

TM3

TM4

 

 

 

For checkpoints, we use filesystem over NAS.

And we have NAS in DC1, and replication of it in DC2. The replication is done in background, once a 1 minute.

If DC1 crashes, we recover our job in DC2, over its NAS replica (no zero data loss).

Our concern is that when job recovers in DC2, the checkpoint state in its NAS replica will be behind with respect to ZooKeeper’s checkpoint reference. Meaning, ZooKeeper might point to checkpoint x, while in NAS of DC2 it still has only checkpoint x-1.

 

We hoped that using configuration “state.checkpoints.num-retained: 3” we will be able to solve it. That is – we had 3 latest checkpoints retained, and we tried to simulate such scenario by deleting files of checkpoint 14 (current latest, besides 13, 12), while leaving reference to checkpoint 14 in ZooKeeper. Our hope was that Flink will failover from that to checkpoint 13, but instead we see that it keeps trying to recover from 14, and failing with error “no such file or directory”. Like that:

 

java.lang.IllegalStateException: Could not initialize keyed state backend.

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:292) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:224) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) [flink-dist_2.11-1.4.2.jar:1.4.2]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]

Caused by: java.io.FileNotFoundException: /logs/failsafe/checkpoints/3e1de245fb1cf1226aad7351a818be96/chk-14/57d26c1c-df40-4b6f-8046-500eb4c8a0b2 (No such file or directory)

        at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_161]

        at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_161]

        at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_161]

        at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:70) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:112) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:471) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:446) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:282) ~[flink-dist_2.11-1.4.2.jar:1.4.2]

        ... 6 more

 

BTW, if we delete reference to checkpoint 14 in ZK, job recovers successfully from checkpoint 13. So we could try to automate that, by somehow monitoring & detecting that we fail to recover checkpoint x, so delete it in ZK so x-1 will be taken. Not great.

 

But maybe you have better ideas for how to deal with such setup with checkpoints replicated between 2 DCs while using ZK cluster for HA stretched over these 2 DCs?

 

Thanks!

Shay