fault tolerance model for stateful streams

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

fault tolerance model for stateful streams

Nathan Forager
hi there,

I noticed the 0.9 release announces exactly-once semantics for streams. I looked at the user guide and the primary mechanism for recovery appears to be checkpointing of user state. I have a few questions:

1. The default behavior is that checkpoints are kept in memory on the JobManager. Am I correct in assuming that this does *not* guarantee failure recovery or exactly-once semantics if the driver fails?

2 The alternative recommended approach is to checkpoint to HDFS. If I have a program where I am doing something like aggregating counts over thousands of keys... it doesn't seem tenable to save a huge number checkpoint files to HDFS with acceptable latency.

Could you comment at all on the persistence model for exactly-once in Flink. I am pretty confused because checkpointing to HDFS in this way seems to have limitations around scalability and latency.

- nate
Reply | Threaded
Open this post in threaded view
|

Re: fault tolerance model for stateful streams

Aljoscha Krettek
Hi,
good questions, about 1. you are right, when the JobManager fails the state is lost. Ufuk, Till and Stephan are currently working on making the JobManager fault tolerant by having hot-standby JobManagers and storing the important JobManager state in ZooKeeper. Maybe they can further comment on their work here.

Regarding 2. (I hope I'm getting this right) the whole state of one parallel instance of an operator is stored as one piece in an HDFS file, so the number of keys does not increase the number of files that need to be stored. By tuning the checkpointing interval the overhead from checkpointing can be adjusted. I think Gyula, Paris and Stephan are also working on incremental state checkpoints and having a key-value store that can gracefully go out-of-core and can also do incremental checkpoints.

Cheers,
Aljoscha

On Mon, 6 Jul 2015 at 09:45 Nathan Forager <[hidden email]> wrote:
hi there,

I noticed the 0.9 release announces exactly-once semantics for streams. I looked at the user guide and the primary mechanism for recovery appears to be checkpointing of user state. I have a few questions:

1. The default behavior is that checkpoints are kept in memory on the JobManager. Am I correct in assuming that this does *not* guarantee failure recovery or exactly-once semantics if the driver fails?

2 The alternative recommended approach is to checkpoint to HDFS. If I have a program where I am doing something like aggregating counts over thousands of keys... it doesn't seem tenable to save a huge number checkpoint files to HDFS with acceptable latency.

Could you comment at all on the persistence model for exactly-once in Flink. I am pretty confused because checkpointing to HDFS in this way seems to have limitations around scalability and latency.

- nate
Reply | Threaded
Open this post in threaded view
|

Re: fault tolerance model for stateful streams

Nathan Forager
thanks for the information Aijoshcha!

i'd love to better understand what the long term solution is for fault tolerance here. is the idea that zookeeper will be used to store the stream state? or the idea is that we can efficiently use hdfs? or you are designing your own key/value persistent storage?

On Mon, Jul 6, 2015 at 1:08 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
good questions, about 1. you are right, when the JobManager fails the state is lost. Ufuk, Till and Stephan are currently working on making the JobManager fault tolerant by having hot-standby JobManagers and storing the important JobManager state in ZooKeeper. Maybe they can further comment on their work here.

Regarding 2. (I hope I'm getting this right) the whole state of one parallel instance of an operator is stored as one piece in an HDFS file, so the number of keys does not increase the number of files that need to be stored. By tuning the checkpointing interval the overhead from checkpointing can be adjusted. I think Gyula, Paris and Stephan are also working on incremental state checkpoints and having a key-value store that can gracefully go out-of-core and can also do incremental checkpoints.

Cheers,
Aljoscha

On Mon, 6 Jul 2015 at 09:45 Nathan Forager <[hidden email]> wrote:
hi there,

I noticed the 0.9 release announces exactly-once semantics for streams. I looked at the user guide and the primary mechanism for recovery appears to be checkpointing of user state. I have a few questions:

1. The default behavior is that checkpoints are kept in memory on the JobManager. Am I correct in assuming that this does *not* guarantee failure recovery or exactly-once semantics if the driver fails?

2 The alternative recommended approach is to checkpoint to HDFS. If I have a program where I am doing something like aggregating counts over thousands of keys... it doesn't seem tenable to save a huge number checkpoint files to HDFS with acceptable latency.

Could you comment at all on the persistence model for exactly-once in Flink. I am pretty confused because checkpointing to HDFS in this way seems to have limitations around scalability and latency.

- nate

Reply | Threaded
Open this post in threaded view
|

Re: fault tolerance model for stateful streams

Stephan Ewen
Hi Nathan!

The state is stored in a configurable "state backend". The state backend itself must be fault tolerant, like HDFS, HBase, Cassandra, Ignite, ... 

What the highly available Flink version does is to store the "StateHandle" in Zookeeper. The StateHandle is the metadata that points to the state (for example in the case of HDFS, a list of file paths). After new checkpoints are made, the data from old checkpoints is removed. This ends up in a ZooKeeper write with a few hundred bytes to kilobytes twice for every checkpoint (for example per second). 

What we are working on right not is to "incrementally checkpoint" state. You have a full checkpoint once in a while (safe the complete state to HDFS) and then frequent delta snapshots (files with the changes). A few more files need to stay around for this, but after the next full checkpoint, the files from the previous delta checkpoints can be removed. Zookeeper accumulates all state handles until the next full checkpoint.

Greetings,
Stephan


On Mon, Jul 6, 2015 at 10:33 AM, Nathan Forager <[hidden email]> wrote:
thanks for the information Aijoshcha!

i'd love to better understand what the long term solution is for fault tolerance here. is the idea that zookeeper will be used to store the stream state? or the idea is that we can efficiently use hdfs? or you are designing your own key/value persistent storage?

On Mon, Jul 6, 2015 at 1:08 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
good questions, about 1. you are right, when the JobManager fails the state is lost. Ufuk, Till and Stephan are currently working on making the JobManager fault tolerant by having hot-standby JobManagers and storing the important JobManager state in ZooKeeper. Maybe they can further comment on their work here.

Regarding 2. (I hope I'm getting this right) the whole state of one parallel instance of an operator is stored as one piece in an HDFS file, so the number of keys does not increase the number of files that need to be stored. By tuning the checkpointing interval the overhead from checkpointing can be adjusted. I think Gyula, Paris and Stephan are also working on incremental state checkpoints and having a key-value store that can gracefully go out-of-core and can also do incremental checkpoints.

Cheers,
Aljoscha

On Mon, 6 Jul 2015 at 09:45 Nathan Forager <[hidden email]> wrote:
hi there,

I noticed the 0.9 release announces exactly-once semantics for streams. I looked at the user guide and the primary mechanism for recovery appears to be checkpointing of user state. I have a few questions:

1. The default behavior is that checkpoints are kept in memory on the JobManager. Am I correct in assuming that this does *not* guarantee failure recovery or exactly-once semantics if the driver fails?

2 The alternative recommended approach is to checkpoint to HDFS. If I have a program where I am doing something like aggregating counts over thousands of keys... it doesn't seem tenable to save a huge number checkpoint files to HDFS with acceptable latency.

Could you comment at all on the persistence model for exactly-once in Flink. I am pretty confused because checkpointing to HDFS in this way seems to have limitations around scalability and latency.

- nate