about the checkpoint and state backend

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

about the checkpoint and state backend

Jinhua Luo
Hi All,

I have two questions:

a) does the records/elements themselves would be checkpointed? or just
record offset checkpointed? That is, what data included in the
checkpoint except for states?

b) where flink stores the state globally? so that the job manager
could restore them on each task manger at failure restart.

For the heap backend, all task managers would send states to job
manager, and job manager would save it in its heap, correct?

For the fs/rocksdb backend, all task managers would save states
(incrementally or not) in local path temporarily, and send them (in
rocksdb snapshot format for the rocksdb case?) to the job manager at
checkpoint?

The path we used to configure backend is the path on the job manager
machine but not on the task managers' machines? So that's the
bottleneck and single failure point? So it's better to use hdfs path
so that we could scale the storage and make it high availability as
well?

Thank you all.
Reply | Threaded
Open this post in threaded view
|

Re: about the checkpoint and state backend

Stefan Richter
Hi,

> I have two questions:
>
> a) does the records/elements themselves would be checkpointed? or just
> record offset checkpointed? That is, what data included in the
> checkpoint except for states?

No, just offsets (or something similar, depending on the source), which are part of the state of the source operators.

>
> b) where flink stores the state globally? so that the job manager
> could restore them on each task manger at failure restart.

In any non-test scenario, the checkpoint/savepoint state is stored in a distributed store, e.g. HDFS or S3. The job manager is just sending „pointers“ (we call them state handles) about where to find the state in the distributed storage to the task managers on recovery.

>
> For the heap backend, all task managers would send states to job
> manager, and job manager would save it in its heap, correct?
>
> For the fs/rocksdb backend, all task managers would save states
> (incrementally or not) in local path temporarily, and send them (in
> rocksdb snapshot format for the rocksdb case?) to the job manager at
> checkpoint?

This is basically the inverse of the restore. For non-test scenarios, the backends write the state to the distributed store and send handles (e.g. filename + offsets) to the job manager. The only exception is the MemoryStateBackend, which is more for testing purposes, and sends state handles that contain the full state as byte arrays.

>
> The path we used to configure backend is the path on the job manager
> machine but not on the task managers' machines? So that's the
> bottleneck and single failure point? So it's better to use hdfs path
> so that we could scale the storage and make it high availability as
> well?

You should configure a path to the distributed storage, and so it becomes no single point of failure. After all, the state must also still be accessible to all task managers via the state handles in case of node failures. The job manager is not collecting the actual state.

Best,
Stefan

Reply | Threaded
Open this post in threaded view
|

Re: about the checkpoint and state backend

Timo Walther
In reply to this post by Jinhua Luo
Hi Jinhua,

I will try to answer your questions:

Flink checkpoints the state of each operator. For a Kafka consumer
operator this is only the offset. For other operators (such as Windows
or a ProcessFunction) the values/list/maps stored in the state are
checkpointed. If you are interested in the internals, I would recommend
this page [1]. Only the MemoryStateBackend sends entire states to the
JobManager (see [2]). But you are right, this is a bottleneck and not
very fault-tolerant. Usually, Flink assumes to have some distributed
file system (such as HDFS) to which each Flink operator can be
checkpointed in a fault-tolerant way. For the RocksDbStateBackend the
local files are copied to HDFS as well. At the time of writing, only the
RocksDBBackend supports incremental checkpoints. The JobManager can then
read from HDFS and restore the operator on a different machine.

Feel free to ask further questions.

Regards,
Timo

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html



Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:

> Hi All,
>
> I have two questions:
>
> a) does the records/elements themselves would be checkpointed? or just
> record offset checkpointed? That is, what data included in the
> checkpoint except for states?
>
> b) where flink stores the state globally? so that the job manager
> could restore them on each task manger at failure restart.
>
> For the heap backend, all task managers would send states to job
> manager, and job manager would save it in its heap, correct?
>
> For the fs/rocksdb backend, all task managers would save states
> (incrementally or not) in local path temporarily, and send them (in
> rocksdb snapshot format for the rocksdb case?) to the job manager at
> checkpoint?
>
> The path we used to configure backend is the path on the job manager
> machine but not on the task managers' machines? So that's the
> bottleneck and single failure point? So it's better to use hdfs path
> so that we could scale the storage and make it high availability as
> well?
>
> Thank you all.


Reply | Threaded
Open this post in threaded view
|

Re: about the checkpoint and state backend

Jinhua Luo
I still do not understand the relationship between rocksdb backend and
the filesystem (here I refer to any filesystem impl, including local,
hdfs, s3).

For example, when I specify the path to rocksdb backend:
env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));

What does it mean?

Each task manager would save states to /data1/flinkapp on its machine?
But it seems no sense. Because when one of the machines crashes, the
job manager could not access the states on dead machine.
Or, each task manager creates rocksdb instance on temporary path, and
send snapshots to job manager, then job manager in turn saves them on
/data1/flinkapp on the job manager's machine?

Could you give the data flow example?

And another question is, when I turn off checkpointing (which is also
default cfg), what happens to the states processing?



2018-01-03 0:06 GMT+08:00 Timo Walther <[hidden email]>:

> Hi Jinhua,
>
> I will try to answer your questions:
>
> Flink checkpoints the state of each operator. For a Kafka consumer operator
> this is only the offset. For other operators (such as Windows or a
> ProcessFunction) the values/list/maps stored in the state are checkpointed.
> If you are interested in the internals, I would recommend this page [1].
> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
> But you are right, this is a bottleneck and not very fault-tolerant.
> Usually, Flink assumes to have some distributed file system (such as HDFS)
> to which each Flink operator can be checkpointed in a fault-tolerant way.
> For the RocksDbStateBackend the local files are copied to HDFS as well. At
> the time of writing, only the RocksDBBackend supports incremental
> checkpoints. The JobManager can then read from HDFS and restore the operator
> on a different machine.
>
> Feel free to ask further questions.
>
> Regards,
> Timo
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>
>
>
> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>
>> Hi All,
>>
>> I have two questions:
>>
>> a) does the records/elements themselves would be checkpointed? or just
>> record offset checkpointed? That is, what data included in the
>> checkpoint except for states?
>>
>> b) where flink stores the state globally? so that the job manager
>> could restore them on each task manger at failure restart.
>>
>> For the heap backend, all task managers would send states to job
>> manager, and job manager would save it in its heap, correct?
>>
>> For the fs/rocksdb backend, all task managers would save states
>> (incrementally or not) in local path temporarily, and send them (in
>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>> checkpoint?
>>
>> The path we used to configure backend is the path on the job manager
>> machine but not on the task managers' machines? So that's the
>> bottleneck and single failure point? So it's better to use hdfs path
>> so that we could scale the storage and make it high availability as
>> well?
>>
>> Thank you all.
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: about the checkpoint and state backend

Aljoscha Krettek
Hi,

The path you give to the constructor must be a path on some distributed filesystem, otherwise the data will be lost when the local machine crashes. As you mentioned correctly.

RocksDB will keep files in a local directory (you can specify this using setDbStoragePath()) and when checkpointing will write to the checkpoint directory that you specified in the constructor.

Best,
Aljoscha


> On 4. Jan 2018, at 14:23, Jinhua Luo <[hidden email]> wrote:
>
> I still do not understand the relationship between rocksdb backend and
> the filesystem (here I refer to any filesystem impl, including local,
> hdfs, s3).
>
> For example, when I specify the path to rocksdb backend:
> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>
> What does it mean?
>
> Each task manager would save states to /data1/flinkapp on its machine?
> But it seems no sense. Because when one of the machines crashes, the
> job manager could not access the states on dead machine.
> Or, each task manager creates rocksdb instance on temporary path, and
> send snapshots to job manager, then job manager in turn saves them on
> /data1/flinkapp on the job manager's machine?
>
> Could you give the data flow example?
>
> And another question is, when I turn off checkpointing (which is also
> default cfg), what happens to the states processing?
>
>
>
> 2018-01-03 0:06 GMT+08:00 Timo Walther <[hidden email]>:
>> Hi Jinhua,
>>
>> I will try to answer your questions:
>>
>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>> this is only the offset. For other operators (such as Windows or a
>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>> If you are interested in the internals, I would recommend this page [1].
>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
>> But you are right, this is a bottleneck and not very fault-tolerant.
>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>> the time of writing, only the RocksDBBackend supports incremental
>> checkpoints. The JobManager can then read from HDFS and restore the operator
>> on a different machine.
>>
>> Feel free to ask further questions.
>>
>> Regards,
>> Timo
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>
>>
>>
>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>
>>> Hi All,
>>>
>>> I have two questions:
>>>
>>> a) does the records/elements themselves would be checkpointed? or just
>>> record offset checkpointed? That is, what data included in the
>>> checkpoint except for states?
>>>
>>> b) where flink stores the state globally? so that the job manager
>>> could restore them on each task manger at failure restart.
>>>
>>> For the heap backend, all task managers would send states to job
>>> manager, and job manager would save it in its heap, correct?
>>>
>>> For the fs/rocksdb backend, all task managers would save states
>>> (incrementally or not) in local path temporarily, and send them (in
>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>> checkpoint?
>>>
>>> The path we used to configure backend is the path on the job manager
>>> machine but not on the task managers' machines? So that's the
>>> bottleneck and single failure point? So it's better to use hdfs path
>>> so that we could scale the storage and make it high availability as
>>> well?
>>>
>>> Thank you all.
>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: about the checkpoint and state backend

Jinhua Luo
OK, I think I get the point.

But another question raises: how task managers merge their rocksdb
snapshot on a global single path?


2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <[hidden email]>:

> Hi,
>
> The path you give to the constructor must be a path on some distributed filesystem, otherwise the data will be lost when the local machine crashes. As you mentioned correctly.
>
> RocksDB will keep files in a local directory (you can specify this using setDbStoragePath()) and when checkpointing will write to the checkpoint directory that you specified in the constructor.
>
> Best,
> Aljoscha
>
>
>> On 4. Jan 2018, at 14:23, Jinhua Luo <[hidden email]> wrote:
>>
>> I still do not understand the relationship between rocksdb backend and
>> the filesystem (here I refer to any filesystem impl, including local,
>> hdfs, s3).
>>
>> For example, when I specify the path to rocksdb backend:
>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>
>> What does it mean?
>>
>> Each task manager would save states to /data1/flinkapp on its machine?
>> But it seems no sense. Because when one of the machines crashes, the
>> job manager could not access the states on dead machine.
>> Or, each task manager creates rocksdb instance on temporary path, and
>> send snapshots to job manager, then job manager in turn saves them on
>> /data1/flinkapp on the job manager's machine?
>>
>> Could you give the data flow example?
>>
>> And another question is, when I turn off checkpointing (which is also
>> default cfg), what happens to the states processing?
>>
>>
>>
>> 2018-01-03 0:06 GMT+08:00 Timo Walther <[hidden email]>:
>>> Hi Jinhua,
>>>
>>> I will try to answer your questions:
>>>
>>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>>> this is only the offset. For other operators (such as Windows or a
>>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>>> If you are interested in the internals, I would recommend this page [1].
>>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>>> the time of writing, only the RocksDBBackend supports incremental
>>> checkpoints. The JobManager can then read from HDFS and restore the operator
>>> on a different machine.
>>>
>>> Feel free to ask further questions.
>>>
>>> Regards,
>>> Timo
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>
>>>
>>>
>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>
>>>> Hi All,
>>>>
>>>> I have two questions:
>>>>
>>>> a) does the records/elements themselves would be checkpointed? or just
>>>> record offset checkpointed? That is, what data included in the
>>>> checkpoint except for states?
>>>>
>>>> b) where flink stores the state globally? so that the job manager
>>>> could restore them on each task manger at failure restart.
>>>>
>>>> For the heap backend, all task managers would send states to job
>>>> manager, and job manager would save it in its heap, correct?
>>>>
>>>> For the fs/rocksdb backend, all task managers would save states
>>>> (incrementally or not) in local path temporarily, and send them (in
>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>> checkpoint?
>>>>
>>>> The path we used to configure backend is the path on the job manager
>>>> machine but not on the task managers' machines? So that's the
>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>> so that we could scale the storage and make it high availability as
>>>> well?
>>>>
>>>> Thank you all.
>>>
>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: about the checkpoint and state backend

Aljoscha Krettek
Each operator (which run in a TaskManager) will write its state to some location in HDFS (or any other DFS) and send a handle to this to the CheckpointCoordinator (which is running on the JobManager). The CheckpointCoordinator is collecting all the handles and creating one Uber-Handle, which describes the complete checkpoint. When restoring, the CheckpointCoordinator figures out which handles need to be sent to which operators for restoring.

Best,
Aljoscha

> On 4. Jan 2018, at 14:44, Jinhua Luo <[hidden email]> wrote:
>
> OK, I think I get the point.
>
> But another question raises: how task managers merge their rocksdb
> snapshot on a global single path?
>
>
> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>> Hi,
>>
>> The path you give to the constructor must be a path on some distributed filesystem, otherwise the data will be lost when the local machine crashes. As you mentioned correctly.
>>
>> RocksDB will keep files in a local directory (you can specify this using setDbStoragePath()) and when checkpointing will write to the checkpoint directory that you specified in the constructor.
>>
>> Best,
>> Aljoscha
>>
>>
>>> On 4. Jan 2018, at 14:23, Jinhua Luo <[hidden email]> wrote:
>>>
>>> I still do not understand the relationship between rocksdb backend and
>>> the filesystem (here I refer to any filesystem impl, including local,
>>> hdfs, s3).
>>>
>>> For example, when I specify the path to rocksdb backend:
>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>>
>>> What does it mean?
>>>
>>> Each task manager would save states to /data1/flinkapp on its machine?
>>> But it seems no sense. Because when one of the machines crashes, the
>>> job manager could not access the states on dead machine.
>>> Or, each task manager creates rocksdb instance on temporary path, and
>>> send snapshots to job manager, then job manager in turn saves them on
>>> /data1/flinkapp on the job manager's machine?
>>>
>>> Could you give the data flow example?
>>>
>>> And another question is, when I turn off checkpointing (which is also
>>> default cfg), what happens to the states processing?
>>>
>>>
>>>
>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <[hidden email]>:
>>>> Hi Jinhua,
>>>>
>>>> I will try to answer your questions:
>>>>
>>>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>>>> this is only the offset. For other operators (such as Windows or a
>>>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>>>> If you are interested in the internals, I would recommend this page [1].
>>>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
>>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>>>> the time of writing, only the RocksDBBackend supports incremental
>>>> checkpoints. The JobManager can then read from HDFS and restore the operator
>>>> on a different machine.
>>>>
>>>> Feel free to ask further questions.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>>
>>>>
>>>>
>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have two questions:
>>>>>
>>>>> a) does the records/elements themselves would be checkpointed? or just
>>>>> record offset checkpointed? That is, what data included in the
>>>>> checkpoint except for states?
>>>>>
>>>>> b) where flink stores the state globally? so that the job manager
>>>>> could restore them on each task manger at failure restart.
>>>>>
>>>>> For the heap backend, all task managers would send states to job
>>>>> manager, and job manager would save it in its heap, correct?
>>>>>
>>>>> For the fs/rocksdb backend, all task managers would save states
>>>>> (incrementally or not) in local path temporarily, and send them (in
>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>>> checkpoint?
>>>>>
>>>>> The path we used to configure backend is the path on the job manager
>>>>> machine but not on the task managers' machines? So that's the
>>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>>> so that we could scale the storage and make it high availability as
>>>>> well?
>>>>>
>>>>> Thank you all.
>>>>
>>>>
>>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: about the checkpoint and state backend

Jinhua Luo
ok, I see.

But as known, one rocksdb instance occupy one directory, so I am still
wondering what's the relationship between the states and rocksdb
instances.

2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <[hidden email]>:

> Each operator (which run in a TaskManager) will write its state to some location in HDFS (or any other DFS) and send a handle to this to the CheckpointCoordinator (which is running on the JobManager). The CheckpointCoordinator is collecting all the handles and creating one Uber-Handle, which describes the complete checkpoint. When restoring, the CheckpointCoordinator figures out which handles need to be sent to which operators for restoring.
>
> Best,
> Aljoscha
>
>> On 4. Jan 2018, at 14:44, Jinhua Luo <[hidden email]> wrote:
>>
>> OK, I think I get the point.
>>
>> But another question raises: how task managers merge their rocksdb
>> snapshot on a global single path?
>>
>>
>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>>> Hi,
>>>
>>> The path you give to the constructor must be a path on some distributed filesystem, otherwise the data will be lost when the local machine crashes. As you mentioned correctly.
>>>
>>> RocksDB will keep files in a local directory (you can specify this using setDbStoragePath()) and when checkpointing will write to the checkpoint directory that you specified in the constructor.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <[hidden email]> wrote:
>>>>
>>>> I still do not understand the relationship between rocksdb backend and
>>>> the filesystem (here I refer to any filesystem impl, including local,
>>>> hdfs, s3).
>>>>
>>>> For example, when I specify the path to rocksdb backend:
>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>>>
>>>> What does it mean?
>>>>
>>>> Each task manager would save states to /data1/flinkapp on its machine?
>>>> But it seems no sense. Because when one of the machines crashes, the
>>>> job manager could not access the states on dead machine.
>>>> Or, each task manager creates rocksdb instance on temporary path, and
>>>> send snapshots to job manager, then job manager in turn saves them on
>>>> /data1/flinkapp on the job manager's machine?
>>>>
>>>> Could you give the data flow example?
>>>>
>>>> And another question is, when I turn off checkpointing (which is also
>>>> default cfg), what happens to the states processing?
>>>>
>>>>
>>>>
>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <[hidden email]>:
>>>>> Hi Jinhua,
>>>>>
>>>>> I will try to answer your questions:
>>>>>
>>>>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>>>>> this is only the offset. For other operators (such as Windows or a
>>>>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>>>>> If you are interested in the internals, I would recommend this page [1].
>>>>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
>>>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>>>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>>>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>>>>> the time of writing, only the RocksDBBackend supports incremental
>>>>> checkpoints. The JobManager can then read from HDFS and restore the operator
>>>>> on a different machine.
>>>>>
>>>>> Feel free to ask further questions.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>>>
>>>>>
>>>>>
>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have two questions:
>>>>>>
>>>>>> a) does the records/elements themselves would be checkpointed? or just
>>>>>> record offset checkpointed? That is, what data included in the
>>>>>> checkpoint except for states?
>>>>>>
>>>>>> b) where flink stores the state globally? so that the job manager
>>>>>> could restore them on each task manger at failure restart.
>>>>>>
>>>>>> For the heap backend, all task managers would send states to job
>>>>>> manager, and job manager would save it in its heap, correct?
>>>>>>
>>>>>> For the fs/rocksdb backend, all task managers would save states
>>>>>> (incrementally or not) in local path temporarily, and send them (in
>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>>>> checkpoint?
>>>>>>
>>>>>> The path we used to configure backend is the path on the job manager
>>>>>> machine but not on the task managers' machines? So that's the
>>>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>>>> so that we could scale the storage and make it high availability as
>>>>>> well?
>>>>>>
>>>>>> Thank you all.
>>>>>
>>>>>
>>>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: about the checkpoint and state backend

Aljoscha Krettek
Ah I see. Currently the RocksDB backend will use one column in RocksDB per state that is registered. The states for different keys of one state are stored in one column.

> On 4. Jan 2018, at 14:56, Jinhua Luo <[hidden email]> wrote:
>
> ok, I see.
>
> But as known, one rocksdb instance occupy one directory, so I am still
> wondering what's the relationship between the states and rocksdb
> instances.
>
> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>> Each operator (which run in a TaskManager) will write its state to some location in HDFS (or any other DFS) and send a handle to this to the CheckpointCoordinator (which is running on the JobManager). The CheckpointCoordinator is collecting all the handles and creating one Uber-Handle, which describes the complete checkpoint. When restoring, the CheckpointCoordinator figures out which handles need to be sent to which operators for restoring.
>>
>> Best,
>> Aljoscha
>>
>>> On 4. Jan 2018, at 14:44, Jinhua Luo <[hidden email]> wrote:
>>>
>>> OK, I think I get the point.
>>>
>>> But another question raises: how task managers merge their rocksdb
>>> snapshot on a global single path?
>>>
>>>
>>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>>>> Hi,
>>>>
>>>> The path you give to the constructor must be a path on some distributed filesystem, otherwise the data will be lost when the local machine crashes. As you mentioned correctly.
>>>>
>>>> RocksDB will keep files in a local directory (you can specify this using setDbStoragePath()) and when checkpointing will write to the checkpoint directory that you specified in the constructor.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <[hidden email]> wrote:
>>>>>
>>>>> I still do not understand the relationship between rocksdb backend and
>>>>> the filesystem (here I refer to any filesystem impl, including local,
>>>>> hdfs, s3).
>>>>>
>>>>> For example, when I specify the path to rocksdb backend:
>>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>>>>
>>>>> What does it mean?
>>>>>
>>>>> Each task manager would save states to /data1/flinkapp on its machine?
>>>>> But it seems no sense. Because when one of the machines crashes, the
>>>>> job manager could not access the states on dead machine.
>>>>> Or, each task manager creates rocksdb instance on temporary path, and
>>>>> send snapshots to job manager, then job manager in turn saves them on
>>>>> /data1/flinkapp on the job manager's machine?
>>>>>
>>>>> Could you give the data flow example?
>>>>>
>>>>> And another question is, when I turn off checkpointing (which is also
>>>>> default cfg), what happens to the states processing?
>>>>>
>>>>>
>>>>>
>>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <[hidden email]>:
>>>>>> Hi Jinhua,
>>>>>>
>>>>>> I will try to answer your questions:
>>>>>>
>>>>>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>>>>>> this is only the offset. For other operators (such as Windows or a
>>>>>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>>>>>> If you are interested in the internals, I would recommend this page [1].
>>>>>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
>>>>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>>>>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>>>>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>>>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>>>>>> the time of writing, only the RocksDBBackend supports incremental
>>>>>> checkpoints. The JobManager can then read from HDFS and restore the operator
>>>>>> on a different machine.
>>>>>>
>>>>>> Feel free to ask further questions.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>>>>
>>>>>>
>>>>>>
>>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I have two questions:
>>>>>>>
>>>>>>> a) does the records/elements themselves would be checkpointed? or just
>>>>>>> record offset checkpointed? That is, what data included in the
>>>>>>> checkpoint except for states?
>>>>>>>
>>>>>>> b) where flink stores the state globally? so that the job manager
>>>>>>> could restore them on each task manger at failure restart.
>>>>>>>
>>>>>>> For the heap backend, all task managers would send states to job
>>>>>>> manager, and job manager would save it in its heap, correct?
>>>>>>>
>>>>>>> For the fs/rocksdb backend, all task managers would save states
>>>>>>> (incrementally or not) in local path temporarily, and send them (in
>>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>>>>> checkpoint?
>>>>>>>
>>>>>>> The path we used to configure backend is the path on the job manager
>>>>>>> machine but not on the task managers' machines? So that's the
>>>>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>>>>> so that we could scale the storage and make it high availability as
>>>>>>> well?
>>>>>>>
>>>>>>> Thank you all.
>>>>>>
>>>>>>
>>>>>>
>>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: about the checkpoint and state backend

Jinhua Luo
One task manager would create one rocksdb instance on its local
temporary dir, correct?
Since there is likely multiple task managers for one cluster, so how
they handle directory conflict, because one rocksdb instance is one
directory, that is, what I mentioned at first, how they merge rocksdb
instances and store it on the single distributed filesystem path?

2018-01-04 22:00 GMT+08:00 Aljoscha Krettek <[hidden email]>:

> Ah I see. Currently the RocksDB backend will use one column in RocksDB per state that is registered. The states for different keys of one state are stored in one column.
>
>> On 4. Jan 2018, at 14:56, Jinhua Luo <[hidden email]> wrote:
>>
>> ok, I see.
>>
>> But as known, one rocksdb instance occupy one directory, so I am still
>> wondering what's the relationship between the states and rocksdb
>> instances.
>>
>> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>>> Each operator (which run in a TaskManager) will write its state to some location in HDFS (or any other DFS) and send a handle to this to the CheckpointCoordinator (which is running on the JobManager). The CheckpointCoordinator is collecting all the handles and creating one Uber-Handle, which describes the complete checkpoint. When restoring, the CheckpointCoordinator figures out which handles need to be sent to which operators for restoring.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>> On 4. Jan 2018, at 14:44, Jinhua Luo <[hidden email]> wrote:
>>>>
>>>> OK, I think I get the point.
>>>>
>>>> But another question raises: how task managers merge their rocksdb
>>>> snapshot on a global single path?
>>>>
>>>>
>>>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>>>>> Hi,
>>>>>
>>>>> The path you give to the constructor must be a path on some distributed filesystem, otherwise the data will be lost when the local machine crashes. As you mentioned correctly.
>>>>>
>>>>> RocksDB will keep files in a local directory (you can specify this using setDbStoragePath()) and when checkpointing will write to the checkpoint directory that you specified in the constructor.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>
>>>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <[hidden email]> wrote:
>>>>>>
>>>>>> I still do not understand the relationship between rocksdb backend and
>>>>>> the filesystem (here I refer to any filesystem impl, including local,
>>>>>> hdfs, s3).
>>>>>>
>>>>>> For example, when I specify the path to rocksdb backend:
>>>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>>>>>
>>>>>> What does it mean?
>>>>>>
>>>>>> Each task manager would save states to /data1/flinkapp on its machine?
>>>>>> But it seems no sense. Because when one of the machines crashes, the
>>>>>> job manager could not access the states on dead machine.
>>>>>> Or, each task manager creates rocksdb instance on temporary path, and
>>>>>> send snapshots to job manager, then job manager in turn saves them on
>>>>>> /data1/flinkapp on the job manager's machine?
>>>>>>
>>>>>> Could you give the data flow example?
>>>>>>
>>>>>> And another question is, when I turn off checkpointing (which is also
>>>>>> default cfg), what happens to the states processing?
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <[hidden email]>:
>>>>>>> Hi Jinhua,
>>>>>>>
>>>>>>> I will try to answer your questions:
>>>>>>>
>>>>>>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>>>>>>> this is only the offset. For other operators (such as Windows or a
>>>>>>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>>>>>>> If you are interested in the internals, I would recommend this page [1].
>>>>>>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
>>>>>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>>>>>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>>>>>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>>>>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>>>>>>> the time of writing, only the RocksDBBackend supports incremental
>>>>>>> checkpoints. The JobManager can then read from HDFS and restore the operator
>>>>>>> on a different machine.
>>>>>>>
>>>>>>> Feel free to ask further questions.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Timo
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I have two questions:
>>>>>>>>
>>>>>>>> a) does the records/elements themselves would be checkpointed? or just
>>>>>>>> record offset checkpointed? That is, what data included in the
>>>>>>>> checkpoint except for states?
>>>>>>>>
>>>>>>>> b) where flink stores the state globally? so that the job manager
>>>>>>>> could restore them on each task manger at failure restart.
>>>>>>>>
>>>>>>>> For the heap backend, all task managers would send states to job
>>>>>>>> manager, and job manager would save it in its heap, correct?
>>>>>>>>
>>>>>>>> For the fs/rocksdb backend, all task managers would save states
>>>>>>>> (incrementally or not) in local path temporarily, and send them (in
>>>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>>>>>> checkpoint?
>>>>>>>>
>>>>>>>> The path we used to configure backend is the path on the job manager
>>>>>>>> machine but not on the task managers' machines? So that's the
>>>>>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>>>>>> so that we could scale the storage and make it high availability as
>>>>>>>> well?
>>>>>>>>
>>>>>>>> Thank you all.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: about the checkpoint and state backend

Aljoscha Krettek
TaskManagers don't do any checkpointing but Operators that run in TaskManagers do.

Each operator, of which there are multiple running on multiple TMs in the cluster will write to a unique DFS directory. Something like:
/checkpoints/job-xyz/checkpoint-1/operator-a/1

These individual checkpoints are not merged together into one directory but the handles to those directories are sent to the CheckpointCoordinator which creates a checkpoint that stores handles to all the states stored in DFS.

Best,
Aljoscha

> On 4. Jan 2018, at 15:06, Jinhua Luo <[hidden email]> wrote:
>
> One task manager would create one rocksdb instance on its local
> temporary dir, correct?
> Since there is likely multiple task managers for one cluster, so how
> they handle directory conflict, because one rocksdb instance is one
> directory, that is, what I mentioned at first, how they merge rocksdb
> instances and store it on the single distributed filesystem path?
>
> 2018-01-04 22:00 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>> Ah I see. Currently the RocksDB backend will use one column in RocksDB per state that is registered. The states for different keys of one state are stored in one column.
>>
>>> On 4. Jan 2018, at 14:56, Jinhua Luo <[hidden email]> wrote:
>>>
>>> ok, I see.
>>>
>>> But as known, one rocksdb instance occupy one directory, so I am still
>>> wondering what's the relationship between the states and rocksdb
>>> instances.
>>>
>>> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>>>> Each operator (which run in a TaskManager) will write its state to some location in HDFS (or any other DFS) and send a handle to this to the CheckpointCoordinator (which is running on the JobManager). The CheckpointCoordinator is collecting all the handles and creating one Uber-Handle, which describes the complete checkpoint. When restoring, the CheckpointCoordinator figures out which handles need to be sent to which operators for restoring.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>> On 4. Jan 2018, at 14:44, Jinhua Luo <[hidden email]> wrote:
>>>>>
>>>>> OK, I think I get the point.
>>>>>
>>>>> But another question raises: how task managers merge their rocksdb
>>>>> snapshot on a global single path?
>>>>>
>>>>>
>>>>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>>>>>> Hi,
>>>>>>
>>>>>> The path you give to the constructor must be a path on some distributed filesystem, otherwise the data will be lost when the local machine crashes. As you mentioned correctly.
>>>>>>
>>>>>> RocksDB will keep files in a local directory (you can specify this using setDbStoragePath()) and when checkpointing will write to the checkpoint directory that you specified in the constructor.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>
>>>>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <[hidden email]> wrote:
>>>>>>>
>>>>>>> I still do not understand the relationship between rocksdb backend and
>>>>>>> the filesystem (here I refer to any filesystem impl, including local,
>>>>>>> hdfs, s3).
>>>>>>>
>>>>>>> For example, when I specify the path to rocksdb backend:
>>>>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>>>>>>
>>>>>>> What does it mean?
>>>>>>>
>>>>>>> Each task manager would save states to /data1/flinkapp on its machine?
>>>>>>> But it seems no sense. Because when one of the machines crashes, the
>>>>>>> job manager could not access the states on dead machine.
>>>>>>> Or, each task manager creates rocksdb instance on temporary path, and
>>>>>>> send snapshots to job manager, then job manager in turn saves them on
>>>>>>> /data1/flinkapp on the job manager's machine?
>>>>>>>
>>>>>>> Could you give the data flow example?
>>>>>>>
>>>>>>> And another question is, when I turn off checkpointing (which is also
>>>>>>> default cfg), what happens to the states processing?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <[hidden email]>:
>>>>>>>> Hi Jinhua,
>>>>>>>>
>>>>>>>> I will try to answer your questions:
>>>>>>>>
>>>>>>>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>>>>>>>> this is only the offset. For other operators (such as Windows or a
>>>>>>>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>>>>>>>> If you are interested in the internals, I would recommend this page [1].
>>>>>>>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
>>>>>>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>>>>>>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>>>>>>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>>>>>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>>>>>>>> the time of writing, only the RocksDBBackend supports incremental
>>>>>>>> checkpoints. The JobManager can then read from HDFS and restore the operator
>>>>>>>> on a different machine.
>>>>>>>>
>>>>>>>> Feel free to ask further questions.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>>>>>>> [2]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I have two questions:
>>>>>>>>>
>>>>>>>>> a) does the records/elements themselves would be checkpointed? or just
>>>>>>>>> record offset checkpointed? That is, what data included in the
>>>>>>>>> checkpoint except for states?
>>>>>>>>>
>>>>>>>>> b) where flink stores the state globally? so that the job manager
>>>>>>>>> could restore them on each task manger at failure restart.
>>>>>>>>>
>>>>>>>>> For the heap backend, all task managers would send states to job
>>>>>>>>> manager, and job manager would save it in its heap, correct?
>>>>>>>>>
>>>>>>>>> For the fs/rocksdb backend, all task managers would save states
>>>>>>>>> (incrementally or not) in local path temporarily, and send them (in
>>>>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>>>>>>> checkpoint?
>>>>>>>>>
>>>>>>>>> The path we used to configure backend is the path on the job manager
>>>>>>>>> machine but not on the task managers' machines? So that's the
>>>>>>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>>>>>>> so that we could scale the storage and make it high availability as
>>>>>>>>> well?
>>>>>>>>>
>>>>>>>>> Thank you all.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: about the checkpoint and state backend

Jinhua Luo
Thanks, and I will read the codes to get more understanding.

Let me repeat another question, what happen if the checkpoing is
disabled (by default, as known)? Does the state still saved?

2018-01-04 22:48 GMT+08:00 Aljoscha Krettek <[hidden email]>:

> TaskManagers don't do any checkpointing but Operators that run in TaskManagers do.
>
> Each operator, of which there are multiple running on multiple TMs in the cluster will write to a unique DFS directory. Something like:
> /checkpoints/job-xyz/checkpoint-1/operator-a/1
>
> These individual checkpoints are not merged together into one directory but the handles to those directories are sent to the CheckpointCoordinator which creates a checkpoint that stores handles to all the states stored in DFS.
>
> Best,
> Aljoscha
>
>> On 4. Jan 2018, at 15:06, Jinhua Luo <[hidden email]> wrote:
>>
>> One task manager would create one rocksdb instance on its local
>> temporary dir, correct?
>> Since there is likely multiple task managers for one cluster, so how
>> they handle directory conflict, because one rocksdb instance is one
>> directory, that is, what I mentioned at first, how they merge rocksdb
>> instances and store it on the single distributed filesystem path?
>>
>> 2018-01-04 22:00 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>>> Ah I see. Currently the RocksDB backend will use one column in RocksDB per state that is registered. The states for different keys of one state are stored in one column.
>>>
>>>> On 4. Jan 2018, at 14:56, Jinhua Luo <[hidden email]> wrote:
>>>>
>>>> ok, I see.
>>>>
>>>> But as known, one rocksdb instance occupy one directory, so I am still
>>>> wondering what's the relationship between the states and rocksdb
>>>> instances.
>>>>
>>>> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>>>>> Each operator (which run in a TaskManager) will write its state to some location in HDFS (or any other DFS) and send a handle to this to the CheckpointCoordinator (which is running on the JobManager). The CheckpointCoordinator is collecting all the handles and creating one Uber-Handle, which describes the complete checkpoint. When restoring, the CheckpointCoordinator figures out which handles need to be sent to which operators for restoring.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>> On 4. Jan 2018, at 14:44, Jinhua Luo <[hidden email]> wrote:
>>>>>>
>>>>>> OK, I think I get the point.
>>>>>>
>>>>>> But another question raises: how task managers merge their rocksdb
>>>>>> snapshot on a global single path?
>>>>>>
>>>>>>
>>>>>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>>>>>>> Hi,
>>>>>>>
>>>>>>> The path you give to the constructor must be a path on some distributed filesystem, otherwise the data will be lost when the local machine crashes. As you mentioned correctly.
>>>>>>>
>>>>>>> RocksDB will keep files in a local directory (you can specify this using setDbStoragePath()) and when checkpointing will write to the checkpoint directory that you specified in the constructor.
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>>
>>>>>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> I still do not understand the relationship between rocksdb backend and
>>>>>>>> the filesystem (here I refer to any filesystem impl, including local,
>>>>>>>> hdfs, s3).
>>>>>>>>
>>>>>>>> For example, when I specify the path to rocksdb backend:
>>>>>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>>>>>>>
>>>>>>>> What does it mean?
>>>>>>>>
>>>>>>>> Each task manager would save states to /data1/flinkapp on its machine?
>>>>>>>> But it seems no sense. Because when one of the machines crashes, the
>>>>>>>> job manager could not access the states on dead machine.
>>>>>>>> Or, each task manager creates rocksdb instance on temporary path, and
>>>>>>>> send snapshots to job manager, then job manager in turn saves them on
>>>>>>>> /data1/flinkapp on the job manager's machine?
>>>>>>>>
>>>>>>>> Could you give the data flow example?
>>>>>>>>
>>>>>>>> And another question is, when I turn off checkpointing (which is also
>>>>>>>> default cfg), what happens to the states processing?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <[hidden email]>:
>>>>>>>>> Hi Jinhua,
>>>>>>>>>
>>>>>>>>> I will try to answer your questions:
>>>>>>>>>
>>>>>>>>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>>>>>>>>> this is only the offset. For other operators (such as Windows or a
>>>>>>>>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>>>>>>>>> If you are interested in the internals, I would recommend this page [1].
>>>>>>>>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
>>>>>>>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>>>>>>>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>>>>>>>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>>>>>>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>>>>>>>>> the time of writing, only the RocksDBBackend supports incremental
>>>>>>>>> checkpoints. The JobManager can then read from HDFS and restore the operator
>>>>>>>>> on a different machine.
>>>>>>>>>
>>>>>>>>> Feel free to ask further questions.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Timo
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>>>>>>>> [2]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I have two questions:
>>>>>>>>>>
>>>>>>>>>> a) does the records/elements themselves would be checkpointed? or just
>>>>>>>>>> record offset checkpointed? That is, what data included in the
>>>>>>>>>> checkpoint except for states?
>>>>>>>>>>
>>>>>>>>>> b) where flink stores the state globally? so that the job manager
>>>>>>>>>> could restore them on each task manger at failure restart.
>>>>>>>>>>
>>>>>>>>>> For the heap backend, all task managers would send states to job
>>>>>>>>>> manager, and job manager would save it in its heap, correct?
>>>>>>>>>>
>>>>>>>>>> For the fs/rocksdb backend, all task managers would save states
>>>>>>>>>> (incrementally or not) in local path temporarily, and send them (in
>>>>>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>>>>>>>> checkpoint?
>>>>>>>>>>
>>>>>>>>>> The path we used to configure backend is the path on the job manager
>>>>>>>>>> machine but not on the task managers' machines? So that's the
>>>>>>>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>>>>>>>> so that we could scale the storage and make it high availability as
>>>>>>>>>> well?
>>>>>>>>>>
>>>>>>>>>> Thank you all.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: about the checkpoint and state backend

Aljoscha Krettek
If checkpointing is disabled RocksDB will only store state locally on disk but it will not be checkpointed to a DFS. This means that in case of failure you lose state.

> On 5. Jan 2018, at 14:38, Jinhua Luo <[hidden email]> wrote:
>
> Thanks, and I will read the codes to get more understanding.
>
> Let me repeat another question, what happen if the checkpoing is
> disabled (by default, as known)? Does the state still saved?
>
> 2018-01-04 22:48 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>> TaskManagers don't do any checkpointing but Operators that run in TaskManagers do.
>>
>> Each operator, of which there are multiple running on multiple TMs in the cluster will write to a unique DFS directory. Something like:
>> /checkpoints/job-xyz/checkpoint-1/operator-a/1
>>
>> These individual checkpoints are not merged together into one directory but the handles to those directories are sent to the CheckpointCoordinator which creates a checkpoint that stores handles to all the states stored in DFS.
>>
>> Best,
>> Aljoscha
>>
>>> On 4. Jan 2018, at 15:06, Jinhua Luo <[hidden email]> wrote:
>>>
>>> One task manager would create one rocksdb instance on its local
>>> temporary dir, correct?
>>> Since there is likely multiple task managers for one cluster, so how
>>> they handle directory conflict, because one rocksdb instance is one
>>> directory, that is, what I mentioned at first, how they merge rocksdb
>>> instances and store it on the single distributed filesystem path?
>>>
>>> 2018-01-04 22:00 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>>>> Ah I see. Currently the RocksDB backend will use one column in RocksDB per state that is registered. The states for different keys of one state are stored in one column.
>>>>
>>>>> On 4. Jan 2018, at 14:56, Jinhua Luo <[hidden email]> wrote:
>>>>>
>>>>> ok, I see.
>>>>>
>>>>> But as known, one rocksdb instance occupy one directory, so I am still
>>>>> wondering what's the relationship between the states and rocksdb
>>>>> instances.
>>>>>
>>>>> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>>>>>> Each operator (which run in a TaskManager) will write its state to some location in HDFS (or any other DFS) and send a handle to this to the CheckpointCoordinator (which is running on the JobManager). The CheckpointCoordinator is collecting all the handles and creating one Uber-Handle, which describes the complete checkpoint. When restoring, the CheckpointCoordinator figures out which handles need to be sent to which operators for restoring.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>> On 4. Jan 2018, at 14:44, Jinhua Luo <[hidden email]> wrote:
>>>>>>>
>>>>>>> OK, I think I get the point.
>>>>>>>
>>>>>>> But another question raises: how task managers merge their rocksdb
>>>>>>> snapshot on a global single path?
>>>>>>>
>>>>>>>
>>>>>>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <[hidden email]>:
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> The path you give to the constructor must be a path on some distributed filesystem, otherwise the data will be lost when the local machine crashes. As you mentioned correctly.
>>>>>>>>
>>>>>>>> RocksDB will keep files in a local directory (you can specify this using setDbStoragePath()) and when checkpointing will write to the checkpoint directory that you specified in the constructor.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>>
>>>>>>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> I still do not understand the relationship between rocksdb backend and
>>>>>>>>> the filesystem (here I refer to any filesystem impl, including local,
>>>>>>>>> hdfs, s3).
>>>>>>>>>
>>>>>>>>> For example, when I specify the path to rocksdb backend:
>>>>>>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>>>>>>>>
>>>>>>>>> What does it mean?
>>>>>>>>>
>>>>>>>>> Each task manager would save states to /data1/flinkapp on its machine?
>>>>>>>>> But it seems no sense. Because when one of the machines crashes, the
>>>>>>>>> job manager could not access the states on dead machine.
>>>>>>>>> Or, each task manager creates rocksdb instance on temporary path, and
>>>>>>>>> send snapshots to job manager, then job manager in turn saves them on
>>>>>>>>> /data1/flinkapp on the job manager's machine?
>>>>>>>>>
>>>>>>>>> Could you give the data flow example?
>>>>>>>>>
>>>>>>>>> And another question is, when I turn off checkpointing (which is also
>>>>>>>>> default cfg), what happens to the states processing?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <[hidden email]>:
>>>>>>>>>> Hi Jinhua,
>>>>>>>>>>
>>>>>>>>>> I will try to answer your questions:
>>>>>>>>>>
>>>>>>>>>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>>>>>>>>>> this is only the offset. For other operators (such as Windows or a
>>>>>>>>>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>>>>>>>>>> If you are interested in the internals, I would recommend this page [1].
>>>>>>>>>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
>>>>>>>>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>>>>>>>>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>>>>>>>>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>>>>>>>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>>>>>>>>>> the time of writing, only the RocksDBBackend supports incremental
>>>>>>>>>> checkpoints. The JobManager can then read from HDFS and restore the operator
>>>>>>>>>> on a different machine.
>>>>>>>>>>
>>>>>>>>>> Feel free to ask further questions.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Timo
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>>>>>>>>> [2]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I have two questions:
>>>>>>>>>>>
>>>>>>>>>>> a) does the records/elements themselves would be checkpointed? or just
>>>>>>>>>>> record offset checkpointed? That is, what data included in the
>>>>>>>>>>> checkpoint except for states?
>>>>>>>>>>>
>>>>>>>>>>> b) where flink stores the state globally? so that the job manager
>>>>>>>>>>> could restore them on each task manger at failure restart.
>>>>>>>>>>>
>>>>>>>>>>> For the heap backend, all task managers would send states to job
>>>>>>>>>>> manager, and job manager would save it in its heap, correct?
>>>>>>>>>>>
>>>>>>>>>>> For the fs/rocksdb backend, all task managers would save states
>>>>>>>>>>> (incrementally or not) in local path temporarily, and send them (in
>>>>>>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>>>>>>>>> checkpoint?
>>>>>>>>>>>
>>>>>>>>>>> The path we used to configure backend is the path on the job manager
>>>>>>>>>>> machine but not on the task managers' machines? So that's the
>>>>>>>>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>>>>>>>>> so that we could scale the storage and make it high availability as
>>>>>>>>>>> well?
>>>>>>>>>>>
>>>>>>>>>>> Thank you all.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>