Hi all,
I'm running Flink on EMR/YARN with 2x m3.xlarge instances and am checkpointing a fairly large RocksDB state to S3. I've found that when the state size hits 10GB, the checkpoint takes around 6 minutes, according to the Flink dashboard. Originally my checkpoint interval was 5 minutes for the job, but I've found that the YARN container crashes (I guess because the checkpoint time is greater than the checkpoint interval), so have now decreased the checkpoint frequency to every 10 minutes. I was just wondering if anyone has any tips about how to reduce the checkpoint time. Taking 6 minutes to checkpoint ~10GB state means it's uploading at ~30MB/sec. I believe the m3.xlarge instances should have around 125MB/sec network bandwidth each, so I think the bottleneck is S3. Since there are 2 instances, I'm not sure if that means each instance is uploading at 15MB/sec - do the state uploads get shared equally among the instances, assuming the state is split equally between the task managers? If the state upload is split between the instances, perhaps the only way to speed up the checkpoints is to add more instances and task managers, and split the state equally among the task managers? Also just wondering - is there any chance the incremental checkpoints work will be complete any time soon? Thanks, Josh |
Hi Josh, Checkpoints that take longer than the checkpoint interval should not be an issue (if you use an up-to-date version of Flink). The checkpoint coordinator will not issue another checkpoint while another one is still ongoing. Is there maybe some additional data for the crashes? A log perhaps? Regarding upload speed, yes, each instance of an operator is responsible for uploading its state so if state is equally distributed between operators on TaskManagers that would mean that each TaskManager would upload roughly the same amount of state. It might be interesting to see what the raw upload speed is when you have those to VMs upload to S3, if it is a lot larger than the speed you're seeing something would be wrong and we should investigate. One last thing: are you using the "fully async" mode of RocksDB? I think I remember that you do, just checking. If it is indeed a problem of upload speed to S3 per machine then yes, using more instances should speed up checkpointing. About incremental checkpoints: they're not going to make it into 1.2 with the current planning but after that, I don't know yet. Cheers, Aljoscha On Mon, 24 Oct 2016 at 19:06 Josh <[hidden email]> wrote:
|
Hi Aljoscha, Thanks for the reply! I found that my stateful operator (with parallelism 10) wasn't equally split between the task managers on the two nodes (it was split 9/1) - so I tweaked the task manager / slot configuration until Flink allocated them equally with 5 instances of the operator on each node. (Just wondering if there's a better way to get Flink to allocate this specific operator equally between nodes, regardless of the number of slots available on each?) Having split the stateful operator equally between 2 nodes, I am actually able to checkpoint 18.5MB of state in ~4 minutes. Which indicates an overall throughput of ~77MB/sec (38.5MB/sec per node). I did what you said and tried uploading a large file from one of those VMs to S3 using the AWS command line tool. It uploaded at a speed of ~76MB/sec. Which is nearly double 38MB/sec but at least it's not orders of magnitude out. Does that sound ok? - I guess there's more that goes on when Flink takes a checkpoint than just uploading anyway... I've upgraded my cluster to Flink 1.2-SNAPSHOT yesterday so yeah should be using the fully async mode. I'll have a proper look in the logs if I see it crash again, and for now will just add more nodes whenever we need to speed up the checkpointing. Thanks, Josh On Tue, Oct 25, 2016 at 3:12 PM, Aljoscha Krettek <[hidden email]> wrote:
|
Hi Josh, might the bandwidth to S3 be shared by all the running nodes? (Not sure how that is setup, so I'm just guessing here.) If you're on 1.2-SNAPSHOT you should also get fully elastic jobs in about a week. (I'm talking about the ability to restart from a savepoint with a different parallelism here.) Keyed state and the Kafka source are already properly elastic, the only missing bit are the timers (for the window operator, mostly) but we're currently working on making that elastic as well. Cheers, Aljoscha On Tue, 25 Oct 2016 at 16:43 Josh <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |