Checkpoint expired before completing

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoint expired before completing

Steven Wu

org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 9353 expired before completing

I might know why this happened in the first place. Our sink operator does synchronous HTTP post, which had a 15-mint latency spike when this all started. This could block flink threads and prevent checkpoint from completing in time. But I don't understand why checkpoint continued to fail after HTTP post latency returned to normal. there seems to be some lingering/cascading effect of previous failed checkpoints on future checkpoints. Only after I redeploy/restart the job an hour later, checkpoint starts to work again.

Would appreciate any suggestions/insights!

Thanks,
Steven
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint expired before completing

Nico Kruber
Hi Steven,
by default, checkpoints time out after 10 minutes if you haven't used
CheckpointConfig#setCheckpointTimeout() to change this timeout.

Depending on your checkpoint interval, and your number of concurrent
checkpoints, there may already be some other checkpoint processes
running while you are waiting for the first to finish. In that case,
succeeding checkpoints may also fail with a timeout. However, they
should definitely get back to normal once your sink has caught up with
all buffered events.

I included Stefan who may shed some more light onto it, but maybe you
can help us identifying the problem by providing logs at DEBUG level
(did akka report any connection loss and gated actors? or maybe some
other error in there?) or even a minimal program to reproduce.


Nico

On 01/12/17 07:36, Steven Wu wrote:

>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
> 9353 expired before completing
>
> I might know why this happened in the first place. Our sink operator
> does synchronous HTTP post, which had a 15-mint latency spike when this
> all started. This could block flink threads and prevent checkpoint from
> completing in time. But I don't understand why checkpoint continued to
> fail after HTTP post latency returned to normal. there seems to be some
> lingering/cascading effect of previous failed checkpoints on future
> checkpoints. Only after I redeploy/restart the job an hour later,
> checkpoint starts to work again.
>
> Would appreciate any suggestions/insights!
>
> Thanks,
> Steven


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint expired before completing

Steven Wu
Here is the checkpoint config. no concurrent checkpoints with 2 minute checkpoint interval and timeout.

Problem is gone after redeployment. I will try if I can reproduce the issue

Inline image 1

On Fri, Dec 1, 2017 at 6:17 AM, Nico Kruber <[hidden email]> wrote:
Hi Steven,
by default, checkpoints time out after 10 minutes if you haven't used
CheckpointConfig#setCheckpointTimeout() to change this timeout.

Depending on your checkpoint interval, and your number of concurrent
checkpoints, there may already be some other checkpoint processes
running while you are waiting for the first to finish. In that case,
succeeding checkpoints may also fail with a timeout. However, they
should definitely get back to normal once your sink has caught up with
all buffered events.

I included Stefan who may shed some more light onto it, but maybe you
can help us identifying the problem by providing logs at DEBUG level
(did akka report any connection loss and gated actors? or maybe some
other error in there?) or even a minimal program to reproduce.


Nico

On 01/12/17 07:36, Steven Wu wrote:
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
> 9353 expired before completing
>
> I might know why this happened in the first place. Our sink operator
> does synchronous HTTP post, which had a 15-mint latency spike when this
> all started. This could block flink threads and prevent checkpoint from
> completing in time. But I don't understand why checkpoint continued to
> fail after HTTP post latency returned to normal. there seems to be some
> lingering/cascading effect of previous failed checkpoints on future
> checkpoints. Only after I redeploy/restart the job an hour later,
> checkpoint starts to work again.
>
> Would appreciate any suggestions/insights!
>
> Thanks,
> Steven


Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint expired before completing

Stephan Ewen
Hi Steven!

You are right, there could be some cascading effect from previous checkpoints.
I think the best way to handle that is to set the "minimum pause between checkpoints". In fact, I would actually recommend this over the checkpoint interval parameter.

The pause will allow the job to handle such effects that built up during an unhealthy checkpoint. You can for example set the checkpoint interval to 2 mins and set the pause to 1.5 mins. That way, if a checkpoint takes longer than usual, the next one will still wait for 1.5 mins after the previous one completed or expired, giving the job time to catch up.

Best,
Stephan


On Fri, Dec 1, 2017 at 10:10 PM, Steven Wu <[hidden email]> wrote:
Here is the checkpoint config. no concurrent checkpoints with 2 minute checkpoint interval and timeout.

Problem is gone after redeployment. I will try if I can reproduce the issue

Inline image 1

On Fri, Dec 1, 2017 at 6:17 AM, Nico Kruber <[hidden email]> wrote:
Hi Steven,
by default, checkpoints time out after 10 minutes if you haven't used
CheckpointConfig#setCheckpointTimeout() to change this timeout.

Depending on your checkpoint interval, and your number of concurrent
checkpoints, there may already be some other checkpoint processes
running while you are waiting for the first to finish. In that case,
succeeding checkpoints may also fail with a timeout. However, they
should definitely get back to normal once your sink has caught up with
all buffered events.

I included Stefan who may shed some more light onto it, but maybe you
can help us identifying the problem by providing logs at DEBUG level
(did akka report any connection loss and gated actors? or maybe some
other error in there?) or even a minimal program to reproduce.


Nico

On 01/12/17 07:36, Steven Wu wrote:
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
> 9353 expired before completing
>
> I might know why this happened in the first place. Our sink operator
> does synchronous HTTP post, which had a 15-mint latency spike when this
> all started. This could block flink threads and prevent checkpoint from
> completing in time. But I don't understand why checkpoint continued to
> fail after HTTP post latency returned to normal. there seems to be some
> lingering/cascading effect of previous failed checkpoints on future
> checkpoints. Only after I redeploy/restart the job an hour later,
> checkpoint starts to work again.
>
> Would appreciate any suggestions/insights!
>
> Thanks,
> Steven



Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint expired before completing

Steven Wu
Stephan, thanks a lot for the explanation. Now everything makes sense to me. Will set the min pause.

On Sat, Dec 2, 2017 at 8:58 AM, Stephan Ewen <[hidden email]> wrote:
Hi Steven!

You are right, there could be some cascading effect from previous checkpoints.
I think the best way to handle that is to set the "minimum pause between checkpoints". In fact, I would actually recommend this over the checkpoint interval parameter.

The pause will allow the job to handle such effects that built up during an unhealthy checkpoint. You can for example set the checkpoint interval to 2 mins and set the pause to 1.5 mins. That way, if a checkpoint takes longer than usual, the next one will still wait for 1.5 mins after the previous one completed or expired, giving the job time to catch up.

Best,
Stephan


On Fri, Dec 1, 2017 at 10:10 PM, Steven Wu <[hidden email]> wrote:
Here is the checkpoint config. no concurrent checkpoints with 2 minute checkpoint interval and timeout.

Problem is gone after redeployment. I will try if I can reproduce the issue

Inline image 1

On Fri, Dec 1, 2017 at 6:17 AM, Nico Kruber <[hidden email]> wrote:
Hi Steven,
by default, checkpoints time out after 10 minutes if you haven't used
CheckpointConfig#setCheckpointTimeout() to change this timeout.

Depending on your checkpoint interval, and your number of concurrent
checkpoints, there may already be some other checkpoint processes
running while you are waiting for the first to finish. In that case,
succeeding checkpoints may also fail with a timeout. However, they
should definitely get back to normal once your sink has caught up with
all buffered events.

I included Stefan who may shed some more light onto it, but maybe you
can help us identifying the problem by providing logs at DEBUG level
(did akka report any connection loss and gated actors? or maybe some
other error in there?) or even a minimal program to reproduce.


Nico

On 01/12/17 07:36, Steven Wu wrote:
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
> 9353 expired before completing
>
> I might know why this happened in the first place. Our sink operator
> does synchronous HTTP post, which had a 15-mint latency spike when this
> all started. This could block flink threads and prevent checkpoint from
> completing in time. But I don't understand why checkpoint continued to
> fail after HTTP post latency returned to normal. there seems to be some
> lingering/cascading effect of previous failed checkpoints on future
> checkpoints. Only after I redeploy/restart the job an hour later,
> checkpoint starts to work again.
>
> Would appreciate any suggestions/insights!
>
> Thanks,
> Steven




Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint expired before completing

Steven Wu
One more question. Since I have set the "Maximum Concurrent Checkpoints" to 1. Will cascading effect still be true? 

Whenever my sink operator returns to normal (in terms of latency), new checkpoint after this point should work, right? there are no other in-flight/concurrent checkpoints still in progress.

Or is the min pause just allowing Flink to catch up in-flight msgs in various queues/buffers? is that the cascading impact?

On Sat, Dec 2, 2017 at 2:10 PM, Steven Wu <[hidden email]> wrote:
Stephan, thanks a lot for the explanation. Now everything makes sense to me. Will set the min pause.

On Sat, Dec 2, 2017 at 8:58 AM, Stephan Ewen <[hidden email]> wrote:
Hi Steven!

You are right, there could be some cascading effect from previous checkpoints.
I think the best way to handle that is to set the "minimum pause between checkpoints". In fact, I would actually recommend this over the checkpoint interval parameter.

The pause will allow the job to handle such effects that built up during an unhealthy checkpoint. You can for example set the checkpoint interval to 2 mins and set the pause to 1.5 mins. That way, if a checkpoint takes longer than usual, the next one will still wait for 1.5 mins after the previous one completed or expired, giving the job time to catch up.

Best,
Stephan


On Fri, Dec 1, 2017 at 10:10 PM, Steven Wu <[hidden email]> wrote:
Here is the checkpoint config. no concurrent checkpoints with 2 minute checkpoint interval and timeout.

Problem is gone after redeployment. I will try if I can reproduce the issue

Inline image 1

On Fri, Dec 1, 2017 at 6:17 AM, Nico Kruber <[hidden email]> wrote:
Hi Steven,
by default, checkpoints time out after 10 minutes if you haven't used
CheckpointConfig#setCheckpointTimeout() to change this timeout.

Depending on your checkpoint interval, and your number of concurrent
checkpoints, there may already be some other checkpoint processes
running while you are waiting for the first to finish. In that case,
succeeding checkpoints may also fail with a timeout. However, they
should definitely get back to normal once your sink has caught up with
all buffered events.

I included Stefan who may shed some more light onto it, but maybe you
can help us identifying the problem by providing logs at DEBUG level
(did akka report any connection loss and gated actors? or maybe some
other error in there?) or even a minimal program to reproduce.


Nico

On 01/12/17 07:36, Steven Wu wrote:
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
> 9353 expired before completing
>
> I might know why this happened in the first place. Our sink operator
> does synchronous HTTP post, which had a 15-mint latency spike when this
> all started. This could block flink threads and prevent checkpoint from
> completing in time. But I don't understand why checkpoint continued to
> fail after HTTP post latency returned to normal. there seems to be some
> lingering/cascading effect of previous failed checkpoints on future
> checkpoints. Only after I redeploy/restart the job an hour later,
> checkpoint starts to work again.
>
> Would appreciate any suggestions/insights!
>
> Thanks,
> Steven





Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint expired before completing

Nico Kruber
Although there may be no checkpoints in flight with this configuration,
there are most certainly records floating around in various buffers
which filled up during your sink pausing everything. Those records need
to be processed first before the new chackpoint's checkpoint barrier may
make it through (also see [1] for details on how the checkpointing works).

So yes, your second assumption is true and that was what I meant by "get
back to normal once your sink has caught up with all buffered events" in
my first message and I assume Stephan also meant with the cascading effect.


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html

On 02/12/17 23:30, Steven Wu wrote:

> One more question. Since I have set the "Maximum Concurrent Checkpoints"
> to 1. Will cascading effect still be true? 
>
> Whenever my sink operator returns to normal (in terms of latency), new
> checkpoint after this point should work, right? there are no other
> in-flight/concurrent checkpoints still in progress.
>
> Or is the min pause just allowing Flink to catch up in-flight msgs in
> various queues/buffers? is that the cascading impact?
>
> On Sat, Dec 2, 2017 at 2:10 PM, Steven Wu <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Stephan, thanks a lot for the explanation. Now everything makes
>     sense to me. Will set the min pause.
>
>     On Sat, Dec 2, 2017 at 8:58 AM, Stephan Ewen <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Steven!
>
>         You are right, there could be some cascading effect from
>         previous checkpoints.
>         I think the best way to handle that is to set the "minimum pause
>         between checkpoints". In fact, I would actually recommend this
>         over the checkpoint interval parameter.
>
>         The pause will allow the job to handle such effects that built
>         up during an unhealthy checkpoint. You can for example set the
>         checkpoint interval to 2 mins and set the pause to 1.5 mins.
>         That way, if a checkpoint takes longer than usual, the next one
>         will still wait for 1.5 mins after the previous one completed or
>         expired, giving the job time to catch up.
>
>         Best,
>         Stephan
>
>
>         On Fri, Dec 1, 2017 at 10:10 PM, Steven Wu <[hidden email]
>         <mailto:[hidden email]>> wrote:
>
>             Here is the checkpoint config. no concurrent checkpoints
>             with 2 minute checkpoint interval and timeout.
>
>             Problem is gone after redeployment. I will try if I can
>             reproduce the issue
>
>             Inline image 1
>
>             On Fri, Dec 1, 2017 at 6:17 AM, Nico Kruber
>             <[hidden email] <mailto:[hidden email]>> wrote:
>
>                 Hi Steven,
>                 by default, checkpoints time out after 10 minutes if you
>                 haven't used
>                 CheckpointConfig#setCheckpointTimeout() to change this
>                 timeout.
>
>                 Depending on your checkpoint interval, and your number
>                 of concurrent
>                 checkpoints, there may already be some other checkpoint
>                 processes
>                 running while you are waiting for the first to finish.
>                 In that case,
>                 succeeding checkpoints may also fail with a timeout.
>                 However, they
>                 should definitely get back to normal once your sink has
>                 caught up with
>                 all buffered events.
>
>                 I included Stefan who may shed some more light onto it,
>                 but maybe you
>                 can help us identifying the problem by providing logs at
>                 DEBUG level
>                 (did akka report any connection loss and gated actors?
>                 or maybe some
>                 other error in there?) or even a minimal program to
>                 reproduce.
>
>
>                 Nico
>
>                 On 01/12/17 07:36, Steven Wu wrote:
>                 >
>                 >
>                 org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
>                 Checkpoint
>                 > 9353 expired before completing
>                 >
>                 > I might know why this happened in the first place. Our
>                 sink operator
>                 > does synchronous HTTP post, which had a 15-mint
>                 latency spike when this
>                 > all started. This could block flink threads and
>                 prevent checkpoint from
>                 > completing in time. But I don't understand why
>                 checkpoint continued to
>                 > fail after HTTP post latency returned to normal. there
>                 seems to be some
>                 > lingering/cascading effect of previous failed
>                 checkpoints on future
>                 > checkpoints. Only after I redeploy/restart the job an
>                 hour later,
>                 > checkpoint starts to work again.
>                 >
>                 > Would appreciate any suggestions/insights!
>                 >
>                 > Thanks,
>                 > Steven
>
>
>
>
>


signature.asc (201 bytes) Download Attachment