Restart hook and checkpoint

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Restart hook and checkpoint

Ashish Pokharel
All,

It looks like Flink's default behavior is to restart all operators on a single operator error - in my case it is a Kafka Producer timing out. When this happens, I see logs that all operators are restarted. This essentially leads to data loss. In my case the volume of data is so high that it is becoming very expensive to checkpoint. I was wondering if Flink has a lifecycle hook to attach a forced checkpointing before restarting operators. That will solve a dire production issue for us. 

Thanks,

-- Ashish
Reply | Threaded
Open this post in threaded view
|

Re: Restart hook and checkpoint

Tzu-Li Tai
Hi Ashish,

Could you elaborate a bit more on why you think the restart of all operators
lead to data loss?

When restart occurs, Flink will restart the job from the latest complete
checkpoint.
All operator states will be reloaded with state written in that checkpoint,
and the position of the input stream will also be re-winded.

I don't think there is a way to force a checkpoint before restarting occurs,
but as I mentioned, that should not be required, because the last complete
checkpoint will be used.
Am I missing something in your particular setup?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Restart hook and checkpoint

Ashish Pokharel
Hi Gordon,

The issue really is we are trying to avoid checkpointing as datasets are really heavy and all of the states are really transient in a few of our apps (flushed within few seconds). So high volume/velocity and transient nature of state make those app good candidates to just not have checkpoints.

We do have offsets committed to Kafka AND we have “some” tolerance for gap / duplicate. However, we do want to handle “graceful” restarts / shutdown. For shutdown, we have been taking savepoints (which works great) but for restart, we just can’t find a way.

Bottom line - we are trading off resiliency for resource utilization and performance but would like to harden apps for production deployments as much as we can.

Hope that makes sense.

Thanks, Ashish

> On Mar 6, 2018, at 10:19 PM, Tzu-Li Tai <[hidden email]> wrote:
>
> Hi Ashish,
>
> Could you elaborate a bit more on why you think the restart of all operators
> lead to data loss?
>
> When restart occurs, Flink will restart the job from the latest complete
> checkpoint.
> All operator states will be reloaded with state written in that checkpoint,
> and the position of the input stream will also be re-winded.
>
> I don't think there is a way to force a checkpoint before restarting occurs,
> but as I mentioned, that should not be required, because the last complete
> checkpoint will be used.
> Am I missing something in your particular setup?
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Restart hook and checkpoint

Aljoscha Krettek
Hi,


Stefan cc'ed might be able to give you some pointers about configuration.

Best,
Aljoscha

On 6. Mar 2018, at 22:35, Ashish Pokharel <[hidden email]> wrote:

Hi Gordon,

The issue really is we are trying to avoid checkpointing as datasets are really heavy and all of the states are really transient in a few of our apps (flushed within few seconds). So high volume/velocity and transient nature of state make those app good candidates to just not have checkpoints.

We do have offsets committed to Kafka AND we have “some” tolerance for gap / duplicate. However, we do want to handle “graceful” restarts / shutdown. For shutdown, we have been taking savepoints (which works great) but for restart, we just can’t find a way.

Bottom line - we are trading off resiliency for resource utilization and performance but would like to harden apps for production deployments as much as we can.

Hope that makes sense.

Thanks, Ashish

On Mar 6, 2018, at 10:19 PM, Tzu-Li Tai <[hidden email]> wrote:

Hi Ashish,

Could you elaborate a bit more on why you think the restart of all operators
lead to data loss?

When restart occurs, Flink will restart the job from the latest complete
checkpoint.
All operator states will be reloaded with state written in that checkpoint,
and the position of the input stream will also be re-winded.

I don't think there is a way to force a checkpoint before restarting occurs,
but as I mentioned, that should not be required, because the last complete
checkpoint will be used.
Am I missing something in your particular setup?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Restart hook and checkpoint

Fabian Hueske-2
If I understand fine-grained recovery correctly, one would still need to take checkpoints.

Ashish would like to avoid checkpointing and accept to lose the state of the failed task.
However, he would like to avoid losing more state than necessary due to restarting of tasks that did not fail.

Best, Fabian

2018-03-15 1:45 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,


Stefan cc'ed might be able to give you some pointers about configuration.

Best,
Aljoscha


On 6. Mar 2018, at 22:35, Ashish Pokharel <[hidden email]> wrote:

Hi Gordon,

The issue really is we are trying to avoid checkpointing as datasets are really heavy and all of the states are really transient in a few of our apps (flushed within few seconds). So high volume/velocity and transient nature of state make those app good candidates to just not have checkpoints.

We do have offsets committed to Kafka AND we have “some” tolerance for gap / duplicate. However, we do want to handle “graceful” restarts / shutdown. For shutdown, we have been taking savepoints (which works great) but for restart, we just can’t find a way.

Bottom line - we are trading off resiliency for resource utilization and performance but would like to harden apps for production deployments as much as we can.

Hope that makes sense.

Thanks, Ashish

On Mar 6, 2018, at 10:19 PM, Tzu-Li Tai <[hidden email]> wrote:

Hi Ashish,

Could you elaborate a bit more on why you think the restart of all operators
lead to data loss?

When restart occurs, Flink will restart the job from the latest complete
checkpoint.
All operator states will be reloaded with state written in that checkpoint,
and the position of the input stream will also be re-winded.

I don't think there is a way to force a checkpoint before restarting occurs,
but as I mentioned, that should not be required, because the last complete
checkpoint will be used.
Am I missing something in your particular setup?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Reply | Threaded
Open this post in threaded view
|

Re: Restart hook and checkpoint

Ashish Pokharel
Thanks Fabian!

Yes, that is exactly what we are looking to achieve. I looked at fine grained recovery FLIP but not sure if that will do the trick. Like Fabian mentioned, we haven’t been enabling checkpointing (reasons below). I do understand it might not always be possible to actually take a checkpoint of an operator that is failing but as long as whole job graph is not restarted and only that failing operator is restarted EVEN IF checkpointing is not enabled I feel like that will do the trick. It is “acceptable” to lose state on that failing operator. Further, if a lifecycle hook is provided in operators say restart (similar to open / close), perhaps app developers can make an attempt to checkpoint state (if a mechanism is provided to programmatically do so) before restarting. Just some thoughts there… 

Back to our scenario - A lot of those high volume datasets we are processing generally require few events to be grouped by key but those events arrive within few seconds (if not milliseconds). However, there are low percentages of events which arrive late or endpoints just can’t send all the group events fast enough and hence are in operator memory until all the events in group arrive or a configured timeout is reached. We are talking about 100s of thousands of endpoints (we will soon be millions actually) streaming data at high volume here. Hence, currently we are not even enabling checkpointing and are relying on Kafka auto commits for the most part if apps need to be restarted (we were hoping to avoid perf issues and resource constraints - also because of transient nature of the datasets, benefits of not checkpointing seemed higher). However, a single operator failure causing entire job graph to restart is causing data loss. I think it is necessary to point out that we have slight leeway here in the sense that it is “okay” to have a little data loss (eg: data loss in operator that is actually failing) or some duplicates (say 1 of the Kafka consumers crashed). However, what we are running into is, one operator failing is causing data loss in 100s of operators that are running in parallel. We would really like to avoid that data loss. 

Thanks, Ashish

On Mar 15, 2018, at 3:41 AM, Fabian Hueske <[hidden email]> wrote:

If I understand fine-grained recovery correctly, one would still need to take checkpoints.

Ashish would like to avoid checkpointing and accept to lose the state of the failed task.
However, he would like to avoid losing more state than necessary due to restarting of tasks that did not fail.

Best, Fabian

2018-03-15 1:45 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,


Stefan cc'ed might be able to give you some pointers about configuration.

Best,
Aljoscha


On 6. Mar 2018, at 22:35, Ashish Pokharel <[hidden email]> wrote:

Hi Gordon,

The issue really is we are trying to avoid checkpointing as datasets are really heavy and all of the states are really transient in a few of our apps (flushed within few seconds). So high volume/velocity and transient nature of state make those app good candidates to just not have checkpoints.

We do have offsets committed to Kafka AND we have “some” tolerance for gap / duplicate. However, we do want to handle “graceful” restarts / shutdown. For shutdown, we have been taking savepoints (which works great) but for restart, we just can’t find a way.

Bottom line - we are trading off resiliency for resource utilization and performance but would like to harden apps for production deployments as much as we can.

Hope that makes sense.

Thanks, Ashish

On Mar 6, 2018, at 10:19 PM, Tzu-Li Tai <[hidden email]> wrote:

Hi Ashish,

Could you elaborate a bit more on why you think the restart of all operators
lead to data loss?

When restart occurs, Flink will restart the job from the latest complete
checkpoint.
All operator states will be reloaded with state written in that checkpoint,
and the position of the input stream will also be re-winded.

I don't think there is a way to force a checkpoint before restarting occurs,
but as I mentioned, that should not be required, because the last complete
checkpoint will be used.
Am I missing something in your particular setup?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




Reply | Threaded
Open this post in threaded view
|

Re: Restart hook and checkpoint

makeyang
currently there is only time based way to trigger a checkpoint. based on this
discussion, I think flink need to introduce event based way to trigger
checkpoint such as restart a task manager should be count as a event.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Restart hook and checkpoint

Fabian Hueske-2
Well, that's not that easy to do, because checkpoints must be coordinated and triggered the JobManager.
Also, the checkpointing mechanism with flowing checkpoint barriers (to ensure checkpoint consistency) won't work once a task failed because it cannot continue processing and forward barriers. If the task failed with an OOME, the whole JVM is gone anyway.
I don't think it is possible to take something like a consistent rescue checkpoint in case of a failure.

I might be possible to checkpoint application state of non-failed tasks, but this would result in data loss for the failed task and we would need to weigh the use cases for such a feature are the implementation effort.
Maybe there are better ways to address such use cases.

Best, Fabian

2018-03-20 6:43 GMT+01:00 makeyang <[hidden email]>:
currently there is only time based way to trigger a checkpoint. based on this
discussion, I think flink need to introduce event based way to trigger
checkpoint such as restart a task manager should be count as a event.

Reply | Threaded
Open this post in threaded view
|

Re: Restart hook and checkpoint

Ashish Pokharel
I definitely like the idea of event based checkpointing :) 

Fabian, I do agree with your point that it is not possible to take a rescue checkpoint consistently. The basis here however is not around the operator that actually failed. It’s to avoid data loss across 100s (probably 1000s of parallel operators) which are being restarted and are “healthy”. We have 100k (nearing million soon) elements pushing data. Losing few seconds worth of data for few is not good but “acceptable” as long as damage can be controlled. Of course, we are going to use rocksdb + 2-phase commit with Kafka where we need exactly once guarantees. The proposal of “fine grain recovery https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures” seems like a good start at least from damage control perspective but even with that it feels like something like “event based approach” can be done for a sub-set of job graph that are “healthy”. 

Thanks, Ashish


On Mar 20, 2018, at 9:53 AM, Fabian Hueske <[hidden email]> wrote:

Well, that's not that easy to do, because checkpoints must be coordinated and triggered the JobManager.
Also, the checkpointing mechanism with flowing checkpoint barriers (to ensure checkpoint consistency) won't work once a task failed because it cannot continue processing and forward barriers. If the task failed with an OOME, the whole JVM is gone anyway.
I don't think it is possible to take something like a consistent rescue checkpoint in case of a failure.

I might be possible to checkpoint application state of non-failed tasks, but this would result in data loss for the failed task and we would need to weigh the use cases for such a feature are the implementation effort.
Maybe there are better ways to address such use cases.

Best, Fabian

2018-03-20 6:43 GMT+01:00 makeyang <[hidden email]>:
currently there is only time based way to trigger a checkpoint. based on this
discussion, I think flink need to introduce event based way to trigger
checkpoint such as restart a task manager should be count as a event.


Reply | Threaded
Open this post in threaded view
|

Re: Restart hook and checkpoint

Fabian Hueske-2
Hi Ashish,

Agreed!
I think the right approach would be to gather the requirements and start a discussion on the dev mailing list.
Contributors and committers who are more familiar with the checkpointing and recovery internals should discuss a solution that can be integrated and doesn't break with the current mechanism.
For instance (not sure whether this is feasible or solves the problem) one could only do local checkpoints and not write to the distributed persistent storage. That would bring down checkpointing costs and the recovery life cycle would not need to be radically changed.

Best, Fabian

2018-03-20 22:56 GMT+01:00 Ashish Pokharel <[hidden email]>:
I definitely like the idea of event based checkpointing :) 

Fabian, I do agree with your point that it is not possible to take a rescue checkpoint consistently. The basis here however is not around the operator that actually failed. It’s to avoid data loss across 100s (probably 1000s of parallel operators) which are being restarted and are “healthy”. We have 100k (nearing million soon) elements pushing data. Losing few seconds worth of data for few is not good but “acceptable” as long as damage can be controlled. Of course, we are going to use rocksdb + 2-phase commit with Kafka where we need exactly once guarantees. The proposal of “fine grain recovery https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures” seems like a good start at least from damage control perspective but even with that it feels like something like “event based approach” can be done for a sub-set of job graph that are “healthy”. 

Thanks, Ashish


On Mar 20, 2018, at 9:53 AM, Fabian Hueske <[hidden email]> wrote:

Well, that's not that easy to do, because checkpoints must be coordinated and triggered the JobManager.
Also, the checkpointing mechanism with flowing checkpoint barriers (to ensure checkpoint consistency) won't work once a task failed because it cannot continue processing and forward barriers. If the task failed with an OOME, the whole JVM is gone anyway.
I don't think it is possible to take something like a consistent rescue checkpoint in case of a failure.

I might be possible to checkpoint application state of non-failed tasks, but this would result in data loss for the failed task and we would need to weigh the use cases for such a feature are the implementation effort.
Maybe there are better ways to address such use cases.

Best, Fabian

2018-03-20 6:43 GMT+01:00 makeyang <[hidden email]>:
currently there is only time based way to trigger a checkpoint. based on this
discussion, I think flink need to introduce event based way to trigger
checkpoint such as restart a task manager should be count as a event.



Reply | Threaded
Open this post in threaded view
|

Re: Restart hook and checkpoint

Ashish Pokharel
Fabian, that sounds good. Should I recap some bullets in an email and start a new thread then?

Thanks, Ashish

On Mar 22, 2018, at 5:16 AM, Fabian Hueske <[hidden email]> wrote:

Hi Ashish,

Agreed!
I think the right approach would be to gather the requirements and start a discussion on the dev mailing list.
Contributors and committers who are more familiar with the checkpointing and recovery internals should discuss a solution that can be integrated and doesn't break with the current mechanism.
For instance (not sure whether this is feasible or solves the problem) one could only do local checkpoints and not write to the distributed persistent storage. That would bring down checkpointing costs and the recovery life cycle would not need to be radically changed.

Best, Fabian

2018-03-20 22:56 GMT+01:00 Ashish Pokharel <[hidden email]>:
I definitely like the idea of event based checkpointing :) 

Fabian, I do agree with your point that it is not possible to take a rescue checkpoint consistently. The basis here however is not around the operator that actually failed. It’s to avoid data loss across 100s (probably 1000s of parallel operators) which are being restarted and are “healthy”. We have 100k (nearing million soon) elements pushing data. Losing few seconds worth of data for few is not good but “acceptable” as long as damage can be controlled. Of course, we are going to use rocksdb + 2-phase commit with Kafka where we need exactly once guarantees. The proposal of “fine grain recovery https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures” seems like a good start at least from damage control perspective but even with that it feels like something like “event based approach” can be done for a sub-set of job graph that are “healthy”. 

Thanks, Ashish


On Mar 20, 2018, at 9:53 AM, Fabian Hueske <[hidden email]> wrote:

Well, that's not that easy to do, because checkpoints must be coordinated and triggered the JobManager.
Also, the checkpointing mechanism with flowing checkpoint barriers (to ensure checkpoint consistency) won't work once a task failed because it cannot continue processing and forward barriers. If the task failed with an OOME, the whole JVM is gone anyway.
I don't think it is possible to take something like a consistent rescue checkpoint in case of a failure.

I might be possible to checkpoint application state of non-failed tasks, but this would result in data loss for the failed task and we would need to weigh the use cases for such a feature are the implementation effort.
Maybe there are better ways to address such use cases.

Best, Fabian

2018-03-20 6:43 GMT+01:00 makeyang <[hidden email]>:
currently there is only time based way to trigger a checkpoint. based on this
discussion, I think flink need to introduce event based way to trigger
checkpoint such as restart a task manager should be count as a event.




Reply | Threaded
Open this post in threaded view
|

Re: Restart hook and checkpoint

Fabian Hueske-2
Yes, that would be great!

Thank you, Fabian

2018-03-23 3:06 GMT+01:00 Ashish Pokharel <[hidden email]>:
Fabian, that sounds good. Should I recap some bullets in an email and start a new thread then?

Thanks, Ashish


On Mar 22, 2018, at 5:16 AM, Fabian Hueske <[hidden email]> wrote:

Hi Ashish,

Agreed!
I think the right approach would be to gather the requirements and start a discussion on the dev mailing list.
Contributors and committers who are more familiar with the checkpointing and recovery internals should discuss a solution that can be integrated and doesn't break with the current mechanism.
For instance (not sure whether this is feasible or solves the problem) one could only do local checkpoints and not write to the distributed persistent storage. That would bring down checkpointing costs and the recovery life cycle would not need to be radically changed.

Best, Fabian

2018-03-20 22:56 GMT+01:00 Ashish Pokharel <[hidden email]>:
I definitely like the idea of event based checkpointing :) 

Fabian, I do agree with your point that it is not possible to take a rescue checkpoint consistently. The basis here however is not around the operator that actually failed. It’s to avoid data loss across 100s (probably 1000s of parallel operators) which are being restarted and are “healthy”. We have 100k (nearing million soon) elements pushing data. Losing few seconds worth of data for few is not good but “acceptable” as long as damage can be controlled. Of course, we are going to use rocksdb + 2-phase commit with Kafka where we need exactly once guarantees. The proposal of “fine grain recovery https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures” seems like a good start at least from damage control perspective but even with that it feels like something like “event based approach” can be done for a sub-set of job graph that are “healthy”. 

Thanks, Ashish


On Mar 20, 2018, at 9:53 AM, Fabian Hueske <[hidden email]> wrote:

Well, that's not that easy to do, because checkpoints must be coordinated and triggered the JobManager.
Also, the checkpointing mechanism with flowing checkpoint barriers (to ensure checkpoint consistency) won't work once a task failed because it cannot continue processing and forward barriers. If the task failed with an OOME, the whole JVM is gone anyway.
I don't think it is possible to take something like a consistent rescue checkpoint in case of a failure.

I might be possible to checkpoint application state of non-failed tasks, but this would result in data loss for the failed task and we would need to weigh the use cases for such a feature are the implementation effort.
Maybe there are better ways to address such use cases.

Best, Fabian

2018-03-20 6:43 GMT+01:00 makeyang <[hidden email]>:
currently there is only time based way to trigger a checkpoint. based on this
discussion, I think flink need to introduce event based way to trigger
checkpoint such as restart a task manager should be count as a event.