Flink streaming with 1+ TB of managed state

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink streaming with 1+ TB of managed state

Steven Ruppert
Hi,

Is anybody currently running flink streaming with north of a terabyte
(TB) of managed state? If you are, can you share your experiences wrt
hardware, tuning, recovery situations, etc?

I'm evaluating flink for a use case I estimate will take around 5TB of
state in total, but looking at the actual implementation of the
rocksDB state and current lack of incremental checkpointing or
recovery, it doesn't seem feasible.

I have successfully tested flink up to roughly 90GB of managed state
in rocksDB, but that's taking 5 minutes to checkpoint or recover (on a
pretty beefy YARN cluster).

For most cases, my state updates are idempotent and can be moved to
something external. However, it'd be nice to know of any current of
future plans for running flink at the terabyte scale.

--Steven

--
*CONFIDENTIALITY NOTICE: This email message, and any documents, files or
previous e-mail messages attached to it is for the sole use of the intended
recipient(s) and may contain confidential and privileged information. Any
unauthorized review, use, disclosure or distribution is prohibited. If you
are not the intended recipient, please contact the sender by reply email
and destroy all copies of the original message.*
Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming with 1+ TB of managed state

rmetzger0
Hi Steven,

According to this presentation, King.com is using Flink with terabytes of state: http://flink-forward.org/wp-content/uploads/2016/07/Gyulo-Fo%CC%81ra-RBEA-Scalable-Real-Time-Analytics-at-King.compressed.pdf (see Page 4 specifically)

For the 90GB experiment, what is the expected time for transferring 90 GB of data in your environment?

Regards,
Robert


On Sat, Nov 19, 2016 at 1:41 AM, Steven Ruppert <[hidden email]> wrote:
Hi,

Is anybody currently running flink streaming with north of a terabyte
(TB) of managed state? If you are, can you share your experiences wrt
hardware, tuning, recovery situations, etc?

I'm evaluating flink for a use case I estimate will take around 5TB of
state in total, but looking at the actual implementation of the
rocksDB state and current lack of incremental checkpointing or
recovery, it doesn't seem feasible.

I have successfully tested flink up to roughly 90GB of managed state
in rocksDB, but that's taking 5 minutes to checkpoint or recover (on a
pretty beefy YARN cluster).

For most cases, my state updates are idempotent and can be moved to
something external. However, it'd be nice to know of any current of
future plans for running flink at the terabyte scale.

--Steven

--
*CONFIDENTIALITY NOTICE: This email message, and any documents, files or
previous e-mail messages attached to it is for the sole use of the intended
recipient(s) and may contain confidential and privileged information. Any
unauthorized review, use, disclosure or distribution is prohibited. If you
are not the intended recipient, please contact the sender by reply email
and destroy all copies of the original message.*

Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming with 1+ TB of managed state

Gyula Fóra
Hi Steven,

As Robert said some of our jobs have state sizes around a TB or more. We use the RocksDB state backend with some configs tuned to perform well on SSDs (you can get some tips here: https://www.youtube.com/watch?v=pvUqbIeoPzM).

We checkpoint our state to Ceph (similar to HDFS but this is what we have :)), and it takes 15-25 minutes for the larger jobs to perform the checkpoints/restore. As this runs async in the background it doesnt hurt our runtime performance, the only problems are with the strain on the network sometimes especially when many jobs are restored at the same time.

Incremental checkpoints would definitely be crazy useful in our case as only a very small percentage of our state is updated between snapshots but it is still feasible as it is for now.

Let me know if I can help with any details.

Cheers,
Gyula

Robert Metzger <[hidden email]> ezt írta (időpont: 2016. nov. 19., Szo, 13:16):
Hi Steven,

According to this presentation, King.com is using Flink with terabytes of state: http://flink-forward.org/wp-content/uploads/2016/07/Gyulo-Fo%CC%81ra-RBEA-Scalable-Real-Time-Analytics-at-King.compressed.pdf (see Page 4 specifically)

For the 90GB experiment, what is the expected time for transferring 90 GB of data in your environment?

Regards,
Robert


On Sat, Nov 19, 2016 at 1:41 AM, Steven Ruppert <[hidden email]> wrote:
Hi,

Is anybody currently running flink streaming with north of a terabyte
(TB) of managed state? If you are, can you share your experiences wrt
hardware, tuning, recovery situations, etc?

I'm evaluating flink for a use case I estimate will take around 5TB of
state in total, but looking at the actual implementation of the
rocksDB state and current lack of incremental checkpointing or
recovery, it doesn't seem feasible.

I have successfully tested flink up to roughly 90GB of managed state
in rocksDB, but that's taking 5 minutes to checkpoint or recover (on a
pretty beefy YARN cluster).

For most cases, my state updates are idempotent and can be moved to
something external. However, it'd be nice to know of any current of
future plans for running flink at the terabyte scale.

--Steven

--
*CONFIDENTIALITY NOTICE: This email message, and any documents, files or
previous e-mail messages attached to it is for the sole use of the intended
recipient(s) and may contain confidential and privileged information. Any
unauthorized review, use, disclosure or distribution is prohibited. If you
are not the intended recipient, please contact the sender by reply email
and destroy all copies of the original message.*

Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming with 1+ TB of managed state

Stephan Ewen
Some background in the Incremental Checkpointing: It is not in the system, but we have a quite advanced design and some committers/contributors are currently starting the effort.

My personal estimate is that it would be available in some months (Q1 next year).

Best,
Stephan


On Sat, Nov 19, 2016 at 4:07 PM, Gyula Fóra <[hidden email]> wrote:
Hi Steven,

As Robert said some of our jobs have state sizes around a TB or more. We use the RocksDB state backend with some configs tuned to perform well on SSDs (you can get some tips here: https://www.youtube.com/watch?v=pvUqbIeoPzM).

We checkpoint our state to Ceph (similar to HDFS but this is what we have :)), and it takes 15-25 minutes for the larger jobs to perform the checkpoints/restore. As this runs async in the background it doesnt hurt our runtime performance, the only problems are with the strain on the network sometimes especially when many jobs are restored at the same time.

Incremental checkpoints would definitely be crazy useful in our case as only a very small percentage of our state is updated between snapshots but it is still feasible as it is for now.

Let me know if I can help with any details.

Cheers,
Gyula

Robert Metzger <[hidden email]> ezt írta (időpont: 2016. nov. 19., Szo, 13:16):
Hi Steven,

According to this presentation, King.com is using Flink with terabytes of state: http://flink-forward.org/wp-content/uploads/2016/07/Gyulo-Fo%CC%81ra-RBEA-Scalable-Real-Time-Analytics-at-King.compressed.pdf (see Page 4 specifically)

For the 90GB experiment, what is the expected time for transferring 90 GB of data in your environment?

Regards,
Robert


On Sat, Nov 19, 2016 at 1:41 AM, Steven Ruppert <[hidden email]> wrote:
Hi,

Is anybody currently running flink streaming with north of a terabyte
(TB) of managed state? If you are, can you share your experiences wrt
hardware, tuning, recovery situations, etc?

I'm evaluating flink for a use case I estimate will take around 5TB of
state in total, but looking at the actual implementation of the
rocksDB state and current lack of incremental checkpointing or
recovery, it doesn't seem feasible.

I have successfully tested flink up to roughly 90GB of managed state
in rocksDB, but that's taking 5 minutes to checkpoint or recover (on a
pretty beefy YARN cluster).

For most cases, my state updates are idempotent and can be moved to
something external. However, it'd be nice to know of any current of
future plans for running flink at the terabyte scale.

--Steven

--
*CONFIDENTIALITY NOTICE: This email message, and any documents, files or
previous e-mail messages attached to it is for the sole use of the intended
recipient(s) and may contain confidential and privileged information. Any
unauthorized review, use, disclosure or distribution is prohibited. If you
are not the intended recipient, please contact the sender by reply email
and destroy all copies of the original message.*


Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming with 1+ TB of managed state

Steven Ruppert
Some responses inline below:

> On Sat, Nov 19, 2016 at 4:07 PM, Gyula Fóra <[hidden email]
> <mailto:[hidden email]>> wrote:
>
> Hi Steven,
>
> As Robert said some of our jobs have state sizes around a TB or
> more. We use the RocksDB state backend with some configs tuned to
> perform well on SSDs (you can get some tips here:
> https://www.youtube.com/watch?v=pvUqbIeoPzM
> <https://www.youtube.com/watch?v=pvUqbIeoPzM>).
>
> We checkpoint our state to Ceph (similar to HDFS but this is what we
>  have :)), and it takes 15-25 minutes for the larger jobs to perform
>  the checkpoints/restore. As this runs async in the background it
> doesnt hurt our runtime performance, the only problems are with the
> strain on the network sometimes especially when many jobs are
> restored at the same time.
>
> Incremental checkpoints would definitely be crazy useful in our case
>  as only a very small percentage of our state is updated between
> snapshots but it is still feasible as it is for now.
>
> Let me know if I can help with any details.
>
> Cheers, Gyula

Thanks Gyula. I do have some additional questions if you are able to answer:

1. How often do you checkpoint/savepoint your jobs? Do you "pipeline"
checkpoints and allow multiple transfers to happen at the same time?

2. Do you have any additional fault tolerance layers in your jobs? It
seems like if some hardware fault or software bug does manage to fail
the job, it's at least 15-25 minutes (plus catchup time) until the job
is available again.

3. In slide 14 of your presentation, you mention an LRU cache in front
of the RocksDB state. Can you give any additional details about that?
Are there any particular deficiencies in vanilla rocksDB state backend
that the LRU cache works around?

4. (This is perhaps a more general flink question) If you make a change
to your jobs that requires recreating the entire 1+ TB of state from the
beginning of the input, do you do anything special to backfill the 1TB
of state, or do you simply run the same streaming job from the beginning?

There is Uber's presentation on this from Flink Forward 2016:
https://youtu.be/9mjAPBNl4YM . I'm curious if you have any other techniques.

***

With the project for which I'm currently vetting flink, I'm actually not
so concerned with the performance of rocksDB state backend itself, both
the read/update/write performance and the checkpointed data transfer
performance. I was testing with async checkpointing, and it does seem
feasible to have those running relatively frequently.

I'm still a little concerned that if it does take upwards of 10 minutes
to checkpoint 1TB of state, the downtime in case of a failure is at
least 10 minutes, which is hard to work around.

On 11/21/2016 07:47 AM, Stephan Ewen wrote:
> Some background in the Incremental Checkpointing: It is not in the
> system, but we have a quite advanced design and some
> committers/contributors are currently starting the effort.

That sounds good. Can you link to any publicly available design docs and
code PRs/branches? I'm pretty sure I came across them before, but my
searching is failing me at the moment.

--Steven

--
*CONFIDENTIALITY NOTICE: This email message, and any documents, files or
previous e-mail messages attached to it is for the sole use of the intended
recipient(s) and may contain confidential and privileged information. Any
unauthorized review, use, disclosure or distribution is prohibited. If you
are not the intended recipient, please contact the sender by reply email
and destroy all copies of the original message.*
Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming with 1+ TB of managed state

Gyula Fóra
Hi Steven,

Let me go try to address your questions :)

1. We take checkpoints approximately every hour for these large states to remove some strain from our networks. Obviously with incremental checkpoints we would go down to every couple of minutes.

2. We don't have anything additional and you are right, recovery time is probably the biggest factor here. For the largest jobs 15-30 minutes recovery is expected even if everything goes well, but can be much worse if we are unlucky.

3. Almost all the time there are a relatively low number of active keys (compared to the overall state size) that we can mostly keep in memory. We could probably get a fairly good approximation of this performance with careful tuning of the block cache but this actually seemed to be much easier with some good performance improvements.

4. We try to stay within the boundaries of the savepoints for most changes, if we really have to make a state breaking change we usually drop the state (so we continue from the live offsets) This is of course not ideal in many cases so we are looking into options like the one you mentioned to backfill with historical data.

Gyula

Steven Ruppert <[hidden email]> ezt írta (időpont: 2016. nov. 21., H, 18:44):
Some responses inline below:

> On Sat, Nov 19, 2016 at 4:07 PM, Gyula Fóra <[hidden email]
> <mailto:[hidden email]>> wrote:
>
> Hi Steven,
>
> As Robert said some of our jobs have state sizes around a TB or
> more. We use the RocksDB state backend with some configs tuned to
> perform well on SSDs (you can get some tips here:
> https://www.youtube.com/watch?v=pvUqbIeoPzM
> <https://www.youtube.com/watch?v=pvUqbIeoPzM>).
>
> We checkpoint our state to Ceph (similar to HDFS but this is what we
>  have :)), and it takes 15-25 minutes for the larger jobs to perform
>  the checkpoints/restore. As this runs async in the background it
> doesnt hurt our runtime performance, the only problems are with the
> strain on the network sometimes especially when many jobs are
> restored at the same time.
>
> Incremental checkpoints would definitely be crazy useful in our case
>  as only a very small percentage of our state is updated between
> snapshots but it is still feasible as it is for now.
>
> Let me know if I can help with any details.
>
> Cheers, Gyula

Thanks Gyula. I do have some additional questions if you are able to answer:

1. How often do you checkpoint/savepoint your jobs? Do you "pipeline"
checkpoints and allow multiple transfers to happen at the same time?

2. Do you have any additional fault tolerance layers in your jobs? It
seems like if some hardware fault or software bug does manage to fail
the job, it's at least 15-25 minutes (plus catchup time) until the job
is available again.

3. In slide 14 of your presentation, you mention an LRU cache in front
of the RocksDB state. Can you give any additional details about that?
Are there any particular deficiencies in vanilla rocksDB state backend
that the LRU cache works around?

4. (This is perhaps a more general flink question) If you make a change
to your jobs that requires recreating the entire 1+ TB of state from the
beginning of the input, do you do anything special to backfill the 1TB
of state, or do you simply run the same streaming job from the beginning?

There is Uber's presentation on this from Flink Forward 2016:
https://youtu.be/9mjAPBNl4YM . I'm curious if you have any other techniques.

***

With the project for which I'm currently vetting flink, I'm actually not
so concerned with the performance of rocksDB state backend itself, both
the read/update/write performance and the checkpointed data transfer
performance. I was testing with async checkpointing, and it does seem
feasible to have those running relatively frequently.

I'm still a little concerned that if it does take upwards of 10 minutes
to checkpoint 1TB of state, the downtime in case of a failure is at
least 10 minutes, which is hard to work around.

On 11/21/2016 07:47 AM, Stephan Ewen wrote:
> Some background in the Incremental Checkpointing: It is not in the
> system, but we have a quite advanced design and some
> committers/contributors are currently starting the effort.

That sounds good. Can you link to any publicly available design docs and
code PRs/branches? I'm pretty sure I came across them before, but my
searching is failing me at the moment.

--Steven

--
*CONFIDENTIALITY NOTICE: This email message, and any documents, files or
previous e-mail messages attached to it is for the sole use of the intended
recipient(s) and may contain confidential and privileged information. Any
unauthorized review, use, disclosure or distribution is prohibited. If you
are not the intended recipient, please contact the sender by reply email
and destroy all copies of the original message.*