Making job fail on Checkpoint Expired?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Making job fail on Checkpoint Expired?

Robin Cassan
Hi all,

I am wondering if there is a way to make a flink job fail (not cancel it) when one or several checkpoints have failed due to being expired (taking longer than the timeout) ?
I am using Flink 1.9.2 and have set `setTolerableCheckpointFailureNumber(1)` which doesn't do the trick. Looking into the CheckpointFailureManager.java class, it looks like this only works when the checkpoint failure reason is `CHECKPOINT_DECLINED`, but the number of failures isn't incremented on `CHECKPOINT_EXPIRED`.
Am I missing something?

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Making job fail on Checkpoint Expired?

Timo Walther
Hi Robin,

this is a very good observation and maybe even unintended behavior.
Maybe Arvid in CC is more familiar with the checkpointing?

Regards,
Timo


On 02.04.20 15:37, Robin Cassan wrote:

> Hi all,
>
> I am wondering if there is a way to make a flink job fail (not cancel
> it) when one or several checkpoints have failed due to being expired
> (taking longer than the timeout) ?
> I am using Flink 1.9.2 and have set
> `*setTolerableCheckpointFailureNumber(1)*` which doesn't do the trick.
> Looking into the CheckpointFailureManager.java class, it looks like this
> only works when the checkpoint failure reason is
> `*CHECKPOINT_DECLINED*`, but the number of failures isn't incremented on
> `*CHECKPOINT_EXPIRED*`.
> Am I missing something?
>
> Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Making job fail on Checkpoint Expired?

Congxian Qiu
Currently, only checkpoint declined will be counted into `continuousFailureCounter`.
Could you please share why do you want the job to fail when checkpoint expired?

Best,
Congxian


Timo Walther <[hidden email]> 于2020年4月2日周四 下午11:23写道:
Hi Robin,

this is a very good observation and maybe even unintended behavior.
Maybe Arvid in CC is more familiar with the checkpointing?

Regards,
Timo


On 02.04.20 15:37, Robin Cassan wrote:
> Hi all,
>
> I am wondering if there is a way to make a flink job fail (not cancel
> it) when one or several checkpoints have failed due to being expired
> (taking longer than the timeout) ?
> I am using Flink 1.9.2 and have set
> `*setTolerableCheckpointFailureNumber(1)*` which doesn't do the trick.
> Looking into the CheckpointFailureManager.java class, it looks like this
> only works when the checkpoint failure reason is
> `*CHECKPOINT_DECLINED*`, but the number of failures isn't incremented on
> `*CHECKPOINT_EXPIRED*`.
> Am I missing something?
>
> Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Making job fail on Checkpoint Expired?

Robin Cassan
Hi Congxian,

Thanks for confirming! The reason I want this behavior is because we are currently investigating issues with checkpoints that keep getting timeouts after the job has been running for a few hours. We observed that, after a few timeouts, if the job was being restarted because of a lost TM for example, the next checkpoints would be working for a few more hours. However, if the job continues running and consuming more data, the next checkpoints will be even bigger and the chances of them completing in time are getting even thinner.
Crashing the job is not a viable solution I agree, but it would allow us to generate data during the time we investigate the root cause of the timeouts.

I believe that having the option to make the job restart after a few checkpoint timeouts would still help to avoid the snowball effect of incremental checkpoints being bigger and bigger if the checkpoints keep getting expired.

I'd love to get your opinion on this!

Thanks,
Robin

Le ven. 3 avr. 2020 à 11:17, Congxian Qiu <[hidden email]> a écrit :
Currently, only checkpoint declined will be counted into `continuousFailureCounter`.
Could you please share why do you want the job to fail when checkpoint expired?

Best,
Congxian


Timo Walther <[hidden email]> 于2020年4月2日周四 下午11:23写道:
Hi Robin,

this is a very good observation and maybe even unintended behavior.
Maybe Arvid in CC is more familiar with the checkpointing?

Regards,
Timo


On 02.04.20 15:37, Robin Cassan wrote:
> Hi all,
>
> I am wondering if there is a way to make a flink job fail (not cancel
> it) when one or several checkpoints have failed due to being expired
> (taking longer than the timeout) ?
> I am using Flink 1.9.2 and have set
> `*setTolerableCheckpointFailureNumber(1)*` which doesn't do the trick.
> Looking into the CheckpointFailureManager.java class, it looks like this
> only works when the checkpoint failure reason is
> `*CHECKPOINT_DECLINED*`, but the number of failures isn't incremented on
> `*CHECKPOINT_EXPIRED*`.
> Am I missing something?
>
> Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Making job fail on Checkpoint Expired?

Congxian Qiu
Hi Robin
Thanks for the detailed reply, and sorry for my late reply.
I think that your request to fail the whole job when continues checkpoint expired is valid, I've created an issue to track this[1]

For now, maybe the following steps can help you find out the reason of time out

1. You can find out the "not ack subtask" in checkpoint ui, (maybe it called A)
2. find out A is under backpressure now?
2.1. if A is under backpressure, please fix it
2.2 if A is not under backpressure, you can go to the tm log of A to find out something abnormal(maybe you need to enable the debug log in this step)

for the snapshot in TM side, it contains 1) barrier align (exactly-once mode, at least once no need to align the barrier); 2) synchronize procedure; 3)asynchronize procedure;

backpressure will affect step 1, too many timers/cpu consumption too high/disk utilization too high may affect step 2; 3) disk performance/network performance may affect step 3;


Robin Cassan <[hidden email]> 于2020年4月3日周五 下午8:35写道:
Hi Congxian,

Thanks for confirming! The reason I want this behavior is because we are currently investigating issues with checkpoints that keep getting timeouts after the job has been running for a few hours. We observed that, after a few timeouts, if the job was being restarted because of a lost TM for example, the next checkpoints would be working for a few more hours. However, if the job continues running and consuming more data, the next checkpoints will be even bigger and the chances of them completing in time are getting even thinner.
Crashing the job is not a viable solution I agree, but it would allow us to generate data during the time we investigate the root cause of the timeouts.

I believe that having the option to make the job restart after a few checkpoint timeouts would still help to avoid the snowball effect of incremental checkpoints being bigger and bigger if the checkpoints keep getting expired.

I'd love to get your opinion on this!

Thanks,
Robin

Le ven. 3 avr. 2020 à 11:17, Congxian Qiu <[hidden email]> a écrit :
Currently, only checkpoint declined will be counted into `continuousFailureCounter`.
Could you please share why do you want the job to fail when checkpoint expired?

Best,
Congxian


Timo Walther <[hidden email]> 于2020年4月2日周四 下午11:23写道:
Hi Robin,

this is a very good observation and maybe even unintended behavior.
Maybe Arvid in CC is more familiar with the checkpointing?

Regards,
Timo


On 02.04.20 15:37, Robin Cassan wrote:
> Hi all,
>
> I am wondering if there is a way to make a flink job fail (not cancel
> it) when one or several checkpoints have failed due to being expired
> (taking longer than the timeout) ?
> I am using Flink 1.9.2 and have set
> `*setTolerableCheckpointFailureNumber(1)*` which doesn't do the trick.
> Looking into the CheckpointFailureManager.java class, it looks like this
> only works when the checkpoint failure reason is
> `*CHECKPOINT_DECLINED*`, but the number of failures isn't incremented on
> `*CHECKPOINT_EXPIRED*`.
> Am I missing something?
>
> Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Making job fail on Checkpoint Expired?

Robin Cassan
Hello again Congxian,

Thank you so much for your advice, it is really helpful! We have managed to pinpoint that most of our problems occur because of disk pressure, most likely due to the usage of EBS, we will try again with local SSDs.
Digging deeper into the "snowball effect on incremental checkpoint timeouts", I am wondering two things:

- Would it help to use Concurrent Checkpointing? In our current tests, Flink waits for the previous checkpoint to finish before starting the next one. So if the previous one has expired, the next one will be twice as big. But if we enable concurrent checkpoints, is it correct to assume that the second checkpoint that the checkpoints sizes should be more consistent? More precisely, if a second checkpoint triggers during the first checkpoint, this will fix the size of the first checkpoint because new barriers are injected, and if the first checkpoint expires it would be retried with the same amount of data?

- I am also wondering if there is a way for long checkpoints to create backpressure on the rest of the stream? This would be a nice feature to have, since it would avoid the state growing too much when checkpointing takes time because of temporary network issues for example.

Thanks for your help!
Robin

Le mer. 8 avr. 2020 à 05:30, Congxian Qiu <[hidden email]> a écrit :
Hi Robin
Thanks for the detailed reply, and sorry for my late reply.
I think that your request to fail the whole job when continues checkpoint expired is valid, I've created an issue to track this[1]

For now, maybe the following steps can help you find out the reason of time out

1. You can find out the "not ack subtask" in checkpoint ui, (maybe it called A)
2. find out A is under backpressure now?
2.1. if A is under backpressure, please fix it
2.2 if A is not under backpressure, you can go to the tm log of A to find out something abnormal(maybe you need to enable the debug log in this step)

for the snapshot in TM side, it contains 1) barrier align (exactly-once mode, at least once no need to align the barrier); 2) synchronize procedure; 3)asynchronize procedure;

backpressure will affect step 1, too many timers/cpu consumption too high/disk utilization too high may affect step 2; 3) disk performance/network performance may affect step 3;


Robin Cassan <[hidden email]> 于2020年4月3日周五 下午8:35写道:
Hi Congxian,

Thanks for confirming! The reason I want this behavior is because we are currently investigating issues with checkpoints that keep getting timeouts after the job has been running for a few hours. We observed that, after a few timeouts, if the job was being restarted because of a lost TM for example, the next checkpoints would be working for a few more hours. However, if the job continues running and consuming more data, the next checkpoints will be even bigger and the chances of them completing in time are getting even thinner.
Crashing the job is not a viable solution I agree, but it would allow us to generate data during the time we investigate the root cause of the timeouts.

I believe that having the option to make the job restart after a few checkpoint timeouts would still help to avoid the snowball effect of incremental checkpoints being bigger and bigger if the checkpoints keep getting expired.

I'd love to get your opinion on this!

Thanks,
Robin

Le ven. 3 avr. 2020 à 11:17, Congxian Qiu <[hidden email]> a écrit :
Currently, only checkpoint declined will be counted into `continuousFailureCounter`.
Could you please share why do you want the job to fail when checkpoint expired?

Best,
Congxian


Timo Walther <[hidden email]> 于2020年4月2日周四 下午11:23写道:
Hi Robin,

this is a very good observation and maybe even unintended behavior.
Maybe Arvid in CC is more familiar with the checkpointing?

Regards,
Timo


On 02.04.20 15:37, Robin Cassan wrote:
> Hi all,
>
> I am wondering if there is a way to make a flink job fail (not cancel
> it) when one or several checkpoints have failed due to being expired
> (taking longer than the timeout) ?
> I am using Flink 1.9.2 and have set
> `*setTolerableCheckpointFailureNumber(1)*` which doesn't do the trick.
> Looking into the CheckpointFailureManager.java class, it looks like this
> only works when the checkpoint failure reason is
> `*CHECKPOINT_DECLINED*`, but the number of failures isn't incremented on
> `*CHECKPOINT_EXPIRED*`.
> Am I missing something?
>
> Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Making job fail on Checkpoint Expired?

Congxian Qiu
Hi Robin

Glad to hear that my reply can help.

From my side, I do not think concurrent checkpoints can help, because it may cause more disk pressure problems.

Currently, this is an issue[1] wants to support Unalign checkpoint, unaligned checkpoint wants to fix the problem of checkpoint under backpressure


Robin Cassan <[hidden email]> 于2020年4月9日周四 下午8:30写道:
Hello again Congxian,

Thank you so much for your advice, it is really helpful! We have managed to pinpoint that most of our problems occur because of disk pressure, most likely due to the usage of EBS, we will try again with local SSDs.
Digging deeper into the "snowball effect on incremental checkpoint timeouts", I am wondering two things:

- Would it help to use Concurrent Checkpointing? In our current tests, Flink waits for the previous checkpoint to finish before starting the next one. So if the previous one has expired, the next one will be twice as big. But if we enable concurrent checkpoints, is it correct to assume that the second checkpoint that the checkpoints sizes should be more consistent? More precisely, if a second checkpoint triggers during the first checkpoint, this will fix the size of the first checkpoint because new barriers are injected, and if the first checkpoint expires it would be retried with the same amount of data?

- I am also wondering if there is a way for long checkpoints to create backpressure on the rest of the stream? This would be a nice feature to have, since it would avoid the state growing too much when checkpointing takes time because of temporary network issues for example.

Thanks for your help!
Robin

Le mer. 8 avr. 2020 à 05:30, Congxian Qiu <[hidden email]> a écrit :
Hi Robin
Thanks for the detailed reply, and sorry for my late reply.
I think that your request to fail the whole job when continues checkpoint expired is valid, I've created an issue to track this[1]

For now, maybe the following steps can help you find out the reason of time out

1. You can find out the "not ack subtask" in checkpoint ui, (maybe it called A)
2. find out A is under backpressure now?
2.1. if A is under backpressure, please fix it
2.2 if A is not under backpressure, you can go to the tm log of A to find out something abnormal(maybe you need to enable the debug log in this step)

for the snapshot in TM side, it contains 1) barrier align (exactly-once mode, at least once no need to align the barrier); 2) synchronize procedure; 3)asynchronize procedure;

backpressure will affect step 1, too many timers/cpu consumption too high/disk utilization too high may affect step 2; 3) disk performance/network performance may affect step 3;


Robin Cassan <[hidden email]> 于2020年4月3日周五 下午8:35写道:
Hi Congxian,

Thanks for confirming! The reason I want this behavior is because we are currently investigating issues with checkpoints that keep getting timeouts after the job has been running for a few hours. We observed that, after a few timeouts, if the job was being restarted because of a lost TM for example, the next checkpoints would be working for a few more hours. However, if the job continues running and consuming more data, the next checkpoints will be even bigger and the chances of them completing in time are getting even thinner.
Crashing the job is not a viable solution I agree, but it would allow us to generate data during the time we investigate the root cause of the timeouts.

I believe that having the option to make the job restart after a few checkpoint timeouts would still help to avoid the snowball effect of incremental checkpoints being bigger and bigger if the checkpoints keep getting expired.

I'd love to get your opinion on this!

Thanks,
Robin

Le ven. 3 avr. 2020 à 11:17, Congxian Qiu <[hidden email]> a écrit :
Currently, only checkpoint declined will be counted into `continuousFailureCounter`.
Could you please share why do you want the job to fail when checkpoint expired?

Best,
Congxian


Timo Walther <[hidden email]> 于2020年4月2日周四 下午11:23写道:
Hi Robin,

this is a very good observation and maybe even unintended behavior.
Maybe Arvid in CC is more familiar with the checkpointing?

Regards,
Timo


On 02.04.20 15:37, Robin Cassan wrote:
> Hi all,
>
> I am wondering if there is a way to make a flink job fail (not cancel
> it) when one or several checkpoints have failed due to being expired
> (taking longer than the timeout) ?
> I am using Flink 1.9.2 and have set
> `*setTolerableCheckpointFailureNumber(1)*` which doesn't do the trick.
> Looking into the CheckpointFailureManager.java class, it looks like this
> only works when the checkpoint failure reason is
> `*CHECKPOINT_DECLINED*`, but the number of failures isn't incremented on
> `*CHECKPOINT_EXPIRED*`.
> Am I missing something?
>
> Thanks!