All,
It looks like Flink's default behavior is to restart all operators on a single operator error - in my case it is a Kafka Producer timing out. When this happens, I see logs that all operators are restarted. This essentially leads to data loss. In my case the volume of data is so high that it is becoming very expensive to checkpoint. I was wondering if Flink has a lifecycle hook to attach a forced checkpointing before restarting operators. That will solve a dire production issue for us. Thanks,
-- Ashish |
Hi Ashish,
Could you elaborate a bit more on why you think the restart of all operators lead to data loss? When restart occurs, Flink will restart the job from the latest complete checkpoint. All operator states will be reloaded with state written in that checkpoint, and the position of the input stream will also be re-winded. I don't think there is a way to force a checkpoint before restarting occurs, but as I mentioned, that should not be required, because the last complete checkpoint will be used. Am I missing something in your particular setup? Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Gordon,
The issue really is we are trying to avoid checkpointing as datasets are really heavy and all of the states are really transient in a few of our apps (flushed within few seconds). So high volume/velocity and transient nature of state make those app good candidates to just not have checkpoints. We do have offsets committed to Kafka AND we have “some” tolerance for gap / duplicate. However, we do want to handle “graceful” restarts / shutdown. For shutdown, we have been taking savepoints (which works great) but for restart, we just can’t find a way. Bottom line - we are trading off resiliency for resource utilization and performance but would like to harden apps for production deployments as much as we can. Hope that makes sense. Thanks, Ashish > On Mar 6, 2018, at 10:19 PM, Tzu-Li Tai <[hidden email]> wrote: > > Hi Ashish, > > Could you elaborate a bit more on why you think the restart of all operators > lead to data loss? > > When restart occurs, Flink will restart the job from the latest complete > checkpoint. > All operator states will be reloaded with state written in that checkpoint, > and the position of the input stream will also be re-winded. > > I don't think there is a way to force a checkpoint before restarting occurs, > but as I mentioned, that should not be required, because the last complete > checkpoint will be used. > Am I missing something in your particular setup? > > Cheers, > Gordon > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Have you looked into fine-grained recovery? https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures Stefan cc'ed might be able to give you some pointers about configuration. Best, Aljoscha
|
If I understand fine-grained recovery correctly, one would still need to take checkpoints. Ashish would like to avoid checkpointing and accept to lose the state of the failed task. However, he would like to avoid losing more state than necessary due to restarting of tasks that did not fail. 2018-03-15 1:45 GMT+01:00 Aljoscha Krettek <[hidden email]>:
|
Thanks Fabian!
Yes, that is exactly what we are looking to achieve. I looked at fine grained recovery FLIP but not sure if that will do the trick. Like Fabian mentioned, we haven’t been enabling checkpointing (reasons below). I do understand it might not always be possible to actually take a checkpoint of an operator that is failing but as long as whole job graph is not restarted and only that failing operator is restarted EVEN IF checkpointing is not enabled I feel like that will do the trick. It is “acceptable” to lose state on that failing operator. Further, if a lifecycle hook is provided in operators say restart (similar to open / close), perhaps app developers can make an attempt to checkpoint state (if a mechanism is provided to programmatically do so) before restarting. Just some thoughts there… Back to our scenario - A lot of those high volume datasets we are processing generally require few events to be grouped by key but those events arrive within few seconds (if not milliseconds). However, there are low percentages of events which arrive late or endpoints just can’t send all the group events fast enough and hence are in operator memory until all the events in group arrive or a configured timeout is reached. We are talking about 100s of thousands of endpoints (we will soon be millions actually) streaming data at high volume here. Hence, currently we are not even enabling checkpointing and are relying on Kafka auto commits for the most part if apps need to be restarted (we were hoping to avoid perf issues and resource constraints - also because of transient nature of the datasets, benefits of not checkpointing seemed higher). However, a single operator failure causing entire job graph to restart is causing data loss. I think it is necessary to point out that we have slight leeway here in the sense that it is “okay” to have a little data loss (eg: data loss in operator that is actually failing) or some duplicates (say 1 of the Kafka consumers crashed). However, what we are running into is, one operator failing is causing data loss in 100s of operators that are running in parallel. We would really like to avoid that data loss. Thanks, Ashish
|
currently there is only time based way to trigger a checkpoint. based on this
discussion, I think flink need to introduce event based way to trigger checkpoint such as restart a task manager should be count as a event. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Well, that's not that easy to do, because checkpoints must be coordinated and triggered the JobManager. Also, the checkpointing mechanism with flowing checkpoint barriers (to ensure checkpoint consistency) won't work once a task failed because it cannot continue processing and forward barriers. If the task failed with an OOME, the whole JVM is gone anyway.I don't think it is possible to take something like a consistent rescue checkpoint in case of a failure. I might be possible to checkpoint application state of non-failed tasks, but this would result in data loss for the failed task and we would need to weigh the use cases for such a feature are the implementation effort. Maybe there are better ways to address such use cases. Best, Fabian 2018-03-20 6:43 GMT+01:00 makeyang <[hidden email]>: currently there is only time based way to trigger a checkpoint. based on this |
I definitely like the idea of event based checkpointing :)
Fabian, I do agree with your point that it is not possible to take a rescue checkpoint consistently. The basis here however is not around the operator that actually failed. It’s to avoid data loss across 100s (probably 1000s of parallel operators) which are being restarted and are “healthy”. We have 100k (nearing million soon) elements pushing data. Losing few seconds worth of data for few is not good but “acceptable” as long as damage can be controlled. Of course, we are going to use rocksdb + 2-phase commit with Kafka where we need exactly once guarantees. The proposal of “fine grain recovery https://cwiki. Thanks, Ashish
|
Hi Ashish, Agreed! I think the right approach would be to gather the requirements and start a discussion on the dev mailing list. For instance (not sure whether this is feasible or solves the problem) one could only do local checkpoints and not write to the distributed persistent storage. That would bring down checkpointing costs and the recovery life cycle would not need to be radically changed. 2018-03-20 22:56 GMT+01:00 Ashish Pokharel <[hidden email]>:
|
Fabian, that sounds good. Should I recap some bullets in an email and start a new thread then?
Thanks, Ashish
|
Yes, that would be great! Thank you, Fabian2018-03-23 3:06 GMT+01:00 Ashish Pokharel <[hidden email]>:
|
Free forum by Nabble | Edit this page |