Understanding checkpoint behavior

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Understanding checkpoint behavior

PranjalChauhan
Hi,

I am new Fink user and currently, using Flink 1.2.1 version. I am trying to
understand how checkpoints actually work when Window operator is processing
events.

My pipeline has the following flow where each operator's parallelism is 1.
source -> flatmap -> tumbling window -> sink
In this pipeline, I had configured the window to be evaluated every 1 hour
(3600 seconds) and the checkpoint interval was 5 mins. The checkpoint
timeout was set to 1 hour as I wanted the checkpoints to complete.

In my window function, the job makes https call to another service so window
function may take some time to evaluate/process all events.

Please refer the following image. In this case, the window was triggered at
23:00:00. Checkpoint 12 was triggered soon after that and I notice that
checkpoint 12 takes long time to complete (compared to other checkpoints
when window function is not processing events).
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/overall_checkpoint_duration_summary_when_waiting_for_window_operator.png>

Following images shows checkpoint 12 details of window & sink operators.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/window_operator_checkpoint_duration_after_window_interval.png>
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/sink_operator_checkpoint_duration_after_window_interval.png>

I see that the time spent for checkpoint was actually just 5 ms & 8 ms
(checkpoint duration sync) for window & sink operators. However, End to End
Duration for checkpoint was 11m 12s for both window & sink operator.

Is this expected behavior? If yes, do you have any suggestion to reduce the
end to end checkpoint duration?

Please let me know if any more information is needed.

Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Understanding checkpoint behavior

Piotr Nowojski
Hi,

Checkpoint duration sync, that’s only the time taken for the “synchronous” part of taking a snapshot of your operator. Your 11m time probably comes from the fact that before this snapshot, checkpoint barrier was stuck somewhere in your pipeline for that amount of time processing some record or bunch of records.

If you write a simple function that only performs `Thread.sleep(new Random().randomInt(3600000))` and nothing else, your checkpoints will be taking random amount of time, since snapshots can not be taken while your function is also executing some code. You can read about some of those concepts in the documentation

https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html

Piotrek

Btw, Flink 1.2.1 is very old and not supported anymore version. One reason to upgrade are improvements in the network stack in Flink 1.5.x, which were in part aiming to reduce checkpoint duration.

> On 5 Nov 2018, at 21:33, PranjalChauhan <[hidden email]> wrote:
>
> Hi,
>
> I am new Fink user and currently, using Flink 1.2.1 version. I am trying to
> understand how checkpoints actually work when Window operator is processing
> events.
>
> My pipeline has the following flow where each operator's parallelism is 1.
> source -> flatmap -> tumbling window -> sink
> In this pipeline, I had configured the window to be evaluated every 1 hour
> (3600 seconds) and the checkpoint interval was 5 mins. The checkpoint
> timeout was set to 1 hour as I wanted the checkpoints to complete.
>
> In my window function, the job makes https call to another service so window
> function may take some time to evaluate/process all events.
>
> Please refer the following image. In this case, the window was triggered at
> 23:00:00. Checkpoint 12 was triggered soon after that and I notice that
> checkpoint 12 takes long time to complete (compared to other checkpoints
> when window function is not processing events).
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/overall_checkpoint_duration_summary_when_waiting_for_window_operator.png>
>
> Following images shows checkpoint 12 details of window & sink operators.
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/window_operator_checkpoint_duration_after_window_interval.png>
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/sink_operator_checkpoint_duration_after_window_interval.png>
>
> I see that the time spent for checkpoint was actually just 5 ms & 8 ms
> (checkpoint duration sync) for window & sink operators. However, End to End
> Duration for checkpoint was 11m 12s for both window & sink operator.
>
> Is this expected behavior? If yes, do you have any suggestion to reduce the
> end to end checkpoint duration?
>
> Please let me know if any more information is needed.
>
> Thanks.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Understanding checkpoint behavior

PranjalChauhan
Thank you for your response Piotr. I plan to upgrade to Flink 1.5.x early
next year.

Two follow-up questions for now.

1.
" When operator snapshots are taken, there are two parts: the synchronous
and the asynchronous parts. "
I understand that when the operator snapshot is being taken, the processing
of that operator is stopped as taking this snapshot is synchronous part. Is
there any other synchronous part in the snapshot / checkpoint process?


2.
Based on the test I mentioned above, my understanding is that for a window
operator, when all events that belongs to checkpoint N and the checkpoint
barrier N are received by window operator (but pending for window to be
triggered), then checkpoint barrier N will be immediately emitted to the
sink operator (so snapshot can be completed) while the events are still
pending to be evaluated by window operator.

Can you please confirm my understanding as I was initially confused by the
following second statement (emits all pending outgoing records) under
Barriers section in this doc
https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/stream_checkpointing.html
?

"When an intermediate operator has received a barrier for snapshot n from
all of its input streams, it emits itself a barrier for snapshot n into all
of its outgoing streams."

" Once the last stream has received barrier n, the operator emits all
pending outgoing records, and then emits snapshot n barriers itself. "

 Thanks,
Pranjal



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Understanding checkpoint behavior

Piotr Nowojski
Hi,

> On 6 Nov 2018, at 18:22, PranjalChauhan <[hidden email]> wrote:
>
> Thank you for your response Piotr. I plan to upgrade to Flink 1.5.x early
> next year.
>
> Two follow-up questions for now.
>
> 1.
> " When operator snapshots are taken, there are two parts: the synchronous
> and the asynchronous parts. "
> I understand that when the operator snapshot is being taken, the processing
> of that operator is stopped as taking this snapshot is synchronous part. Is
> there any other synchronous part in the snapshot / checkpoint process?
>

Not as far as I know.

>
> 2.
> Based on the test I mentioned above, my understanding is that for a window
> operator, when all events that belongs to checkpoint N and the checkpoint
> barrier N are received by window operator (but pending for window to be
> triggered), then checkpoint barrier N will be immediately emitted to the
> sink operator (so snapshot can be completed) while the events are still
> pending to be evaluated by window operator.
>
> Can you please confirm my understanding as I was initially confused by the
> following second statement (emits all pending outgoing records) under
> Barriers section in this doc
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/stream_checkpointing.html
> ?
>
> "When an intermediate operator has received a barrier for snapshot n from
> all of its input streams, it emits itself a barrier for snapshot n into all
> of its outgoing streams."
>
> " Once the last stream has received barrier n, the operator emits all
> pending outgoing records, and then emits snapshot n barriers itself. “

I think you might be mixing two different concepts, watermarks and checkpoint barriers. The documentation that you are quoting describes checkpointing mechanism, checkpoint barriers and records alignment. Checkpoint barrier do not cause any results to be emitted from WindowOperator, this happens when timers are triggered (wall clock timers in case of processing time or watermarks in case of event time).

Piotrek
Reply | Threaded
Open this post in threaded view
|

Re: Understanding checkpoint behavior

Timo Walther
In reply to this post by PranjalChauhan
Hi,

do you observe such long checkpoint times also without performing
external calls? If not, I guess the communication to the external system
is flaky.

Maybe you have to rethink how you perform such calls in order to make
the pipeline more robust against these latencies. Flink also offers an
async operator [1] for exactly such cases, this could be worth a look.

Regards,
Timo

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html


Am 05.11.18 um 18:52 schrieb PranjalChauhan:

> Hi,
>
> I am new Fink user and currently, using Flink 1.2.1 version. I am trying to
> understand how checkpoints actually work when Window operator is processing
> events.
>
> My pipeline has the following flow where each operator's parallelism is 1.
> source -> flatmap -> tumbling window -> sink
> In this pipeline, I had configured the window to be evaluated every 1 hour
> (3600 seconds) and the checkpoint interval was 5 mins. The checkpoint
> timeout was set to 1 hour as I wanted the checkpoints to complete.
>
> In my window function, the job makes https call to another service so window
> function may take some time to evaluate/process all events.
>
> Please refer the following image. In this case, the window was triggered at
> 23:00:00. Checkpoint 12 was triggered soon after that and I notice that
> checkpoint 12 takes long time to complete (compared to other checkpoints
> when window function is not processing events).
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/overall_checkpoint_duration_summary_when_waiting_for_window_operator.png>
>
> Following images shows checkpoint 12 details of window & sink operators.
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/window_operator_checkpoint_duration_after_window_interval.png>
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/sink_operator_checkpoint_duration_after_window_interval.png>
>
> I see that the time spent for checkpoint was actually just 5 ms & 8 ms
> (checkpoint duration sync) for window & sink operators. However, End to End
> Duration for checkpoint was 11m 12s for both window & sink operator.
>
> Is this expected behavior? If yes, do you have any suggestion to reduce the
> end to end checkpoint duration?
>
> Please let me know if any more information is needed.
>
> Thanks.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/