Hi,
Is anybody currently running flink streaming with north of a terabyte (TB) of managed state? If you are, can you share your experiences wrt hardware, tuning, recovery situations, etc? I'm evaluating flink for a use case I estimate will take around 5TB of state in total, but looking at the actual implementation of the rocksDB state and current lack of incremental checkpointing or recovery, it doesn't seem feasible. I have successfully tested flink up to roughly 90GB of managed state in rocksDB, but that's taking 5 minutes to checkpoint or recover (on a pretty beefy YARN cluster). For most cases, my state updates are idempotent and can be moved to something external. However, it'd be nice to know of any current of future plans for running flink at the terabyte scale. --Steven -- *CONFIDENTIALITY NOTICE: This email message, and any documents, files or previous e-mail messages attached to it is for the sole use of the intended recipient(s) and may contain confidential and privileged information. Any unauthorized review, use, disclosure or distribution is prohibited. If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.* |
Hi Steven, According to this presentation, King.com is using Flink with terabytes of state: http://flink-forward.org/wp-content/uploads/2016/07/Gyulo-Fo%CC%81ra-RBEA-Scalable-Real-Time-Analytics-at-King.compressed.pdf (see Page 4 specifically) For the 90GB experiment, what is the expected time for transferring 90 GB of data in your environment? Regards, Robert On Sat, Nov 19, 2016 at 1:41 AM, Steven Ruppert <[hidden email]> wrote: Hi, |
Hi Steven, As Robert said some of our jobs have state sizes around a TB or more. We use the RocksDB state backend with some configs tuned to perform well on SSDs (you can get some tips here: https://www.youtube.com/watch?v=pvUqbIeoPzM). We checkpoint our state to Ceph (similar to HDFS but this is what we have :)), and it takes 15-25 minutes for the larger jobs to perform the checkpoints/restore. As this runs async in the background it doesnt hurt our runtime performance, the only problems are with the strain on the network sometimes especially when many jobs are restored at the same time. Incremental checkpoints would definitely be crazy useful in our case as only a very small percentage of our state is updated between snapshots but it is still feasible as it is for now. Let me know if I can help with any details. Cheers, Gyula Robert Metzger <[hidden email]> ezt írta (időpont: 2016. nov. 19., Szo, 13:16):
|
Some background in the Incremental Checkpointing: It is not in the system, but we have a quite advanced design and some committers/contributors are currently starting the effort. My personal estimate is that it would be available in some months (Q1 next year). Best, Stephan On Sat, Nov 19, 2016 at 4:07 PM, Gyula Fóra <[hidden email]> wrote:
|
Some responses inline below:
> On Sat, Nov 19, 2016 at 4:07 PM, Gyula Fóra <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Steven, > > As Robert said some of our jobs have state sizes around a TB or > more. We use the RocksDB state backend with some configs tuned to > perform well on SSDs (you can get some tips here: > https://www.youtube.com/watch?v=pvUqbIeoPzM > <https://www.youtube.com/watch?v=pvUqbIeoPzM>). > > We checkpoint our state to Ceph (similar to HDFS but this is what we > have :)), and it takes 15-25 minutes for the larger jobs to perform > the checkpoints/restore. As this runs async in the background it > doesnt hurt our runtime performance, the only problems are with the > strain on the network sometimes especially when many jobs are > restored at the same time. > > Incremental checkpoints would definitely be crazy useful in our case > as only a very small percentage of our state is updated between > snapshots but it is still feasible as it is for now. > > Let me know if I can help with any details. > > Cheers, Gyula Thanks Gyula. I do have some additional questions if you are able to answer: 1. How often do you checkpoint/savepoint your jobs? Do you "pipeline" checkpoints and allow multiple transfers to happen at the same time? 2. Do you have any additional fault tolerance layers in your jobs? It seems like if some hardware fault or software bug does manage to fail the job, it's at least 15-25 minutes (plus catchup time) until the job is available again. 3. In slide 14 of your presentation, you mention an LRU cache in front of the RocksDB state. Can you give any additional details about that? Are there any particular deficiencies in vanilla rocksDB state backend that the LRU cache works around? 4. (This is perhaps a more general flink question) If you make a change to your jobs that requires recreating the entire 1+ TB of state from the beginning of the input, do you do anything special to backfill the 1TB of state, or do you simply run the same streaming job from the beginning? There is Uber's presentation on this from Flink Forward 2016: https://youtu.be/9mjAPBNl4YM . I'm curious if you have any other techniques. *** With the project for which I'm currently vetting flink, I'm actually not so concerned with the performance of rocksDB state backend itself, both the read/update/write performance and the checkpointed data transfer performance. I was testing with async checkpointing, and it does seem feasible to have those running relatively frequently. I'm still a little concerned that if it does take upwards of 10 minutes to checkpoint 1TB of state, the downtime in case of a failure is at least 10 minutes, which is hard to work around. On 11/21/2016 07:47 AM, Stephan Ewen wrote: > Some background in the Incremental Checkpointing: It is not in the > system, but we have a quite advanced design and some > committers/contributors are currently starting the effort. That sounds good. Can you link to any publicly available design docs and code PRs/branches? I'm pretty sure I came across them before, but my searching is failing me at the moment. --Steven -- *CONFIDENTIALITY NOTICE: This email message, and any documents, files or previous e-mail messages attached to it is for the sole use of the intended recipient(s) and may contain confidential and privileged information. Any unauthorized review, use, disclosure or distribution is prohibited. If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.* |
Hi Steven, Let me go try to address your questions :) 1. We take checkpoints approximately every hour for these large states to remove some strain from our networks. Obviously with incremental checkpoints we would go down to every couple of minutes. 2. We don't have anything additional and you are right, recovery time is probably the biggest factor here. For the largest jobs 15-30 minutes recovery is expected even if everything goes well, but can be much worse if we are unlucky. 3. Almost all the time there are a relatively low number of active keys (compared to the overall state size) that we can mostly keep in memory. We could probably get a fairly good approximation of this performance with careful tuning of the block cache but this actually seemed to be much easier with some good performance improvements. 4. We try to stay within the boundaries of the savepoints for most changes, if we really have to make a state breaking change we usually drop the state (so we continue from the live offsets) This is of course not ideal in many cases so we are looking into options like the one you mentioned to backfill with historical data. Gyula Steven Ruppert <[hidden email]> ezt írta (időpont: 2016. nov. 21., H, 18:44): Some responses inline below: |
Free forum by Nabble | Edit this page |