RocksDB / checkpoint questions

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

RocksDB / checkpoint questions

Christophe Jolif
If I understand well RocksDB is using two disk, the Task Manager local disk for "local storage" of the state and the distributed disk for checkpointing.

Two questions:

- if I have 3 TaskManager I should expect more or less (depending on how the tasks are balanced) to find a third of my overall state stored on disk on each of this TaskManager node?

- if the local node/disk fails I will get the state back from the distributed disk and things will start again and all is fine. However what happens if the distributed disk fails? Will Flink continue processing waiting for me to mount a new distributed disk? Or will it stop? May I lose data/reprocess things under that condition?

--
Christophe Jolif
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB / checkpoint questions

Kien Truong


Sent from TypeApp
On Feb 3, 2018, at 10:48, Kien Truong <[hidden email]> wrote:
Hi,
Speaking from my experience, if the distributed disk fail, the checkpoint will fail as well, but the job will continue running. The checkpoint scheduler will keep running, so the first scheduled checkpoint after you repair your disk should succeed.

Of course, if you also write to the distributed disk inside your job, then your job may crash too, but this is unrelated to the checkpoint process.

Best regards,
Kien

Sent from TypeApp
On Feb 2, 2018, at 23:30, Christophe Jolif < [hidden email]> wrote:
If I understand well RocksDB is using two disk, the Task Manager local disk for "local storage" of the state and the distributed disk for checkpointing.

Two questions:

- if I have 3 TaskManager I should expect more or less (depending on how the tasks are balanced) to find a third of my overall state stored on disk on each of this TaskManager node?

- if the local node/disk fails I will get the state back from the distributed disk and things will start again and all is fine. However what happens if the distributed disk fails? Will Flink continue processing waiting for me to mount a new distributed disk? Or will it stop? May I lose data/reprocess things under that condition?

--
Christophe Jolif
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB / checkpoint questions

Christophe Jolif
Thanks for sharing Kien. Sounds like the logical behavior but good to hear it is confirmed by your experience.

--
Christophe

On Sat, Feb 3, 2018 at 7:25 AM, Kien Truong <[hidden email]> wrote:


Sent from TypeApp
On Feb 3, 2018, at 10:48, Kien Truong <[hidden email]> wrote:
Hi,
Speaking from my experience, if the distributed disk fail, the checkpoint will fail as well, but the job will continue running. The checkpoint scheduler will keep running, so the first scheduled checkpoint after you repair your disk should succeed.

Of course, if you also write to the distributed disk inside your job, then your job may crash too, but this is unrelated to the checkpoint process.

Best regards,
Kien

Sent from TypeApp
On Feb 2, 2018, at 23:30, Christophe Jolif < [hidden email]> wrote:
If I understand well RocksDB is using two disk, the Task Manager local disk for "local storage" of the state and the distributed disk for checkpointing.

Two questions:

- if I have 3 TaskManager I should expect more or less (depending on how the tasks are balanced) to find a third of my overall state stored on disk on each of this TaskManager node?

- if the local node/disk fails I will get the state back from the distributed disk and things will start again and all is fine. However what happens if the distributed disk fails? Will Flink continue processing waiting for me to mount a new distributed disk? Or will it stop? May I lose data/reprocess things under that condition?

--
Christophe Jolif

Reply | Threaded
Open this post in threaded view
|

Re: RocksDB / checkpoint questions

Stefan Richter
Hi,

you are correct that RocksDB has a „working directory“ on local disk and checkpoints + savepoints go to a distributed filesystem.

- if I have 3 TaskManager I should expect more or less (depending on how the tasks are balanced) to find a third of my overall state stored on disk on each of this TaskManager node?

This question is not so much about RocksDB, but more about Flink’s keyBy partitioning, i.e. how work is distributed between the parallel instances of an operator, and the answer is that it will apply hash partitioning based on your event keys to distribute the keys (and their state) between your 3 nodes. If your key space is very skewed or there are heavy hitter keys with much larger state than most other keys, this can lead to some imbalances. If your keys are not skewed and have similar state size, every node should have roughly the same state size.

- if the local node/disk fails I will get the state back from the distributed disk and things will start again and all is fine. However what happens if the distributed disk fails? Will Flink continue processing waiting for me to mount a new distributed disk? Or will it stop? May I lose data/reprocess things under that condition? 

Starting from Flink 1.5, this is configurable, please see https://issues.apache.org/jira/browse/FLINK-4809 and https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/checkpointing.html in section „fail/continue task on checkpoint errors“. If you tolerate checkpoint failures, you will not lose data: if your job fails, it can recover from the latest successful checkpoint once your DFS is again available If the job does not fail, it will eventually make another checkpoint once DFS is back. If you do not tolerate checkpoint failures, your job will simply fail and restart from the last successful checkpoint and recover once DFS is back.

Best,
Stefan

Am 03.02.2018 um 17:45 schrieb Christophe Jolif <[hidden email]>:

Thanks for sharing Kien. Sounds like the logical behavior but good to hear it is confirmed by your experience.

--
Christophe

On Sat, Feb 3, 2018 at 7:25 AM, Kien Truong <[hidden email]> wrote:


Sent from TypeApp
On Feb 3, 2018, at 10:48, Kien Truong <[hidden email]> wrote:
Hi,
Speaking from my experience, if the distributed disk fail, the checkpoint will fail as well, but the job will continue running. The checkpoint scheduler will keep running, so the first scheduled checkpoint after you repair your disk should succeed.

Of course, if you also write to the distributed disk inside your job, then your job may crash too, but this is unrelated to the checkpoint process.

Best regards,
Kien

Sent from TypeApp
On Feb 2, 2018, at 23:30, Christophe Jolif < [hidden email]> wrote:
If I understand well RocksDB is using two disk, the Task Manager local disk for "local storage" of the state and the distributed disk for checkpointing.

Two questions:

- if I have 3 TaskManager I should expect more or less (depending on how the tasks are balanced) to find a third of my overall state stored on disk on each of this TaskManager node?

- if the local node/disk fails I will get the state back from the distributed disk and things will start again and all is fine. However what happens if the distributed disk fails? Will Flink continue processing waiting for me to mount a new distributed disk? Or will it stop? May I lose data/reprocess things under that condition?

--
Christophe Jolif


Reply | Threaded
Open this post in threaded view
|

Re: RocksDB / checkpoint questions

Christophe Jolif
Thanks a lot for the details Steffan.

--
Christophe

On Mon, Feb 5, 2018 at 11:31 AM, Stefan Richter <[hidden email]> wrote:
Hi,

you are correct that RocksDB has a „working directory“ on local disk and checkpoints + savepoints go to a distributed filesystem.

- if I have 3 TaskManager I should expect more or less (depending on how the tasks are balanced) to find a third of my overall state stored on disk on each of this TaskManager node?

This question is not so much about RocksDB, but more about Flink’s keyBy partitioning, i.e. how work is distributed between the parallel instances of an operator, and the answer is that it will apply hash partitioning based on your event keys to distribute the keys (and their state) between your 3 nodes. If your key space is very skewed or there are heavy hitter keys with much larger state than most other keys, this can lead to some imbalances. If your keys are not skewed and have similar state size, every node should have roughly the same state size.

- if the local node/disk fails I will get the state back from the distributed disk and things will start again and all is fine. However what happens if the distributed disk fails? Will Flink continue processing waiting for me to mount a new distributed disk? Or will it stop? May I lose data/reprocess things under that condition? 

Starting from Flink 1.5, this is configurable, please see https://issues.apache.org/jira/browse/FLINK-4809 and https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/checkpointing.html in section „fail/continue task on checkpoint errors“. If you tolerate checkpoint failures, you will not lose data: if your job fails, it can recover from the latest successful checkpoint once your DFS is again available If the job does not fail, it will eventually make another checkpoint once DFS is back. If you do not tolerate checkpoint failures, your job will simply fail and restart from the last successful checkpoint and recover once DFS is back.

Best,
Stefan

Am 03.02.2018 um 17:45 schrieb Christophe Jolif <[hidden email]>:

Thanks for sharing Kien. Sounds like the logical behavior but good to hear it is confirmed by your experience.

--
Christophe

On Sat, Feb 3, 2018 at 7:25 AM, Kien Truong <[hidden email]> wrote:


Sent from TypeApp
On Feb 3, 2018, at 10:48, Kien Truong <[hidden email]> wrote:
Hi,
Speaking from my experience, if the distributed disk fail, the checkpoint will fail as well, but the job will continue running. The checkpoint scheduler will keep running, so the first scheduled checkpoint after you repair your disk should succeed.

Of course, if you also write to the distributed disk inside your job, then your job may crash too, but this is unrelated to the checkpoint process.

Best regards,
Kien

Sent from TypeApp
On Feb 2, 2018, at 23:30, Christophe Jolif < [hidden email]> wrote:
If I understand well RocksDB is using two disk, the Task Manager local disk for "local storage" of the state and the distributed disk for checkpointing.

Two questions:

- if I have 3 TaskManager I should expect more or less (depending on how the tasks are balanced) to find a third of my overall state stored on disk on each of this TaskManager node?

- if the local node/disk fails I will get the state back from the distributed disk and things will start again and all is fine. However what happens if the distributed disk fails? Will Flink continue processing waiting for me to mount a new distributed disk? Or will it stop? May I lose data/reprocess things under that condition?

--
Christophe Jolif