Hi,
I am new Fink user and currently, using Flink 1.2.1 version. I am trying to understand how checkpoints actually work when Window operator is processing events. My pipeline has the following flow where each operator's parallelism is 1. source -> flatmap -> tumbling window -> sink In this pipeline, I had configured the window to be evaluated every 1 hour (3600 seconds) and the checkpoint interval was 5 mins. The checkpoint timeout was set to 1 hour as I wanted the checkpoints to complete. In my window function, the job makes https call to another service so window function may take some time to evaluate/process all events. Please refer the following image. In this case, the window was triggered at 23:00:00. Checkpoint 12 was triggered soon after that and I notice that checkpoint 12 takes long time to complete (compared to other checkpoints when window function is not processing events). <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/overall_checkpoint_duration_summary_when_waiting_for_window_operator.png> Following images shows checkpoint 12 details of window & sink operators. <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/window_operator_checkpoint_duration_after_window_interval.png> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/sink_operator_checkpoint_duration_after_window_interval.png> I see that the time spent for checkpoint was actually just 5 ms & 8 ms (checkpoint duration sync) for window & sink operators. However, End to End Duration for checkpoint was 11m 12s for both window & sink operator. Is this expected behavior? If yes, do you have any suggestion to reduce the end to end checkpoint duration? Please let me know if any more information is needed. Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Checkpoint duration sync, that’s only the time taken for the “synchronous” part of taking a snapshot of your operator. Your 11m time probably comes from the fact that before this snapshot, checkpoint barrier was stuck somewhere in your pipeline for that amount of time processing some record or bunch of records. If you write a simple function that only performs `Thread.sleep(new Random().randomInt(3600000))` and nothing else, your checkpoints will be taking random amount of time, since snapshots can not be taken while your function is also executing some code. You can read about some of those concepts in the documentation https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html Piotrek Btw, Flink 1.2.1 is very old and not supported anymore version. One reason to upgrade are improvements in the network stack in Flink 1.5.x, which were in part aiming to reduce checkpoint duration. > On 5 Nov 2018, at 21:33, PranjalChauhan <[hidden email]> wrote: > > Hi, > > I am new Fink user and currently, using Flink 1.2.1 version. I am trying to > understand how checkpoints actually work when Window operator is processing > events. > > My pipeline has the following flow where each operator's parallelism is 1. > source -> flatmap -> tumbling window -> sink > In this pipeline, I had configured the window to be evaluated every 1 hour > (3600 seconds) and the checkpoint interval was 5 mins. The checkpoint > timeout was set to 1 hour as I wanted the checkpoints to complete. > > In my window function, the job makes https call to another service so window > function may take some time to evaluate/process all events. > > Please refer the following image. In this case, the window was triggered at > 23:00:00. Checkpoint 12 was triggered soon after that and I notice that > checkpoint 12 takes long time to complete (compared to other checkpoints > when window function is not processing events). > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/overall_checkpoint_duration_summary_when_waiting_for_window_operator.png> > > Following images shows checkpoint 12 details of window & sink operators. > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/window_operator_checkpoint_duration_after_window_interval.png> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/sink_operator_checkpoint_duration_after_window_interval.png> > > I see that the time spent for checkpoint was actually just 5 ms & 8 ms > (checkpoint duration sync) for window & sink operators. However, End to End > Duration for checkpoint was 11m 12s for both window & sink operator. > > Is this expected behavior? If yes, do you have any suggestion to reduce the > end to end checkpoint duration? > > Please let me know if any more information is needed. > > Thanks. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thank you for your response Piotr. I plan to upgrade to Flink 1.5.x early
next year. Two follow-up questions for now. 1. " When operator snapshots are taken, there are two parts: the synchronous and the asynchronous parts. " I understand that when the operator snapshot is being taken, the processing of that operator is stopped as taking this snapshot is synchronous part. Is there any other synchronous part in the snapshot / checkpoint process? 2. Based on the test I mentioned above, my understanding is that for a window operator, when all events that belongs to checkpoint N and the checkpoint barrier N are received by window operator (but pending for window to be triggered), then checkpoint barrier N will be immediately emitted to the sink operator (so snapshot can be completed) while the events are still pending to be evaluated by window operator. Can you please confirm my understanding as I was initially confused by the following second statement (emits all pending outgoing records) under Barriers section in this doc https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/stream_checkpointing.html ? "When an intermediate operator has received a barrier for snapshot n from all of its input streams, it emits itself a barrier for snapshot n into all of its outgoing streams." " Once the last stream has received barrier n, the operator emits all pending outgoing records, and then emits snapshot n barriers itself. " Thanks, Pranjal -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
> On 6 Nov 2018, at 18:22, PranjalChauhan <[hidden email]> wrote: > > Thank you for your response Piotr. I plan to upgrade to Flink 1.5.x early > next year. > > Two follow-up questions for now. > > 1. > " When operator snapshots are taken, there are two parts: the synchronous > and the asynchronous parts. " > I understand that when the operator snapshot is being taken, the processing > of that operator is stopped as taking this snapshot is synchronous part. Is > there any other synchronous part in the snapshot / checkpoint process? > Not as far as I know. > > 2. > Based on the test I mentioned above, my understanding is that for a window > operator, when all events that belongs to checkpoint N and the checkpoint > barrier N are received by window operator (but pending for window to be > triggered), then checkpoint barrier N will be immediately emitted to the > sink operator (so snapshot can be completed) while the events are still > pending to be evaluated by window operator. > > Can you please confirm my understanding as I was initially confused by the > following second statement (emits all pending outgoing records) under > Barriers section in this doc > https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/stream_checkpointing.html > ? > > "When an intermediate operator has received a barrier for snapshot n from > all of its input streams, it emits itself a barrier for snapshot n into all > of its outgoing streams." > > " Once the last stream has received barrier n, the operator emits all > pending outgoing records, and then emits snapshot n barriers itself. “ I think you might be mixing two different concepts, watermarks and checkpoint barriers. The documentation that you are quoting describes checkpointing mechanism, checkpoint barriers and records alignment. Checkpoint barrier do not cause any results to be emitted from WindowOperator, this happens when timers are triggered (wall clock timers in case of processing time or watermarks in case of event time). Piotrek |
In reply to this post by PranjalChauhan
Hi,
do you observe such long checkpoint times also without performing external calls? If not, I guess the communication to the external system is flaky. Maybe you have to rethink how you perform such calls in order to make the pipeline more robust against these latencies. Flink also offers an async operator [1] for exactly such cases, this could be worth a look. Regards, Timo [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html Am 05.11.18 um 18:52 schrieb PranjalChauhan: > Hi, > > I am new Fink user and currently, using Flink 1.2.1 version. I am trying to > understand how checkpoints actually work when Window operator is processing > events. > > My pipeline has the following flow where each operator's parallelism is 1. > source -> flatmap -> tumbling window -> sink > In this pipeline, I had configured the window to be evaluated every 1 hour > (3600 seconds) and the checkpoint interval was 5 mins. The checkpoint > timeout was set to 1 hour as I wanted the checkpoints to complete. > > In my window function, the job makes https call to another service so window > function may take some time to evaluate/process all events. > > Please refer the following image. In this case, the window was triggered at > 23:00:00. Checkpoint 12 was triggered soon after that and I notice that > checkpoint 12 takes long time to complete (compared to other checkpoints > when window function is not processing events). > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/overall_checkpoint_duration_summary_when_waiting_for_window_operator.png> > > Following images shows checkpoint 12 details of window & sink operators. > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/window_operator_checkpoint_duration_after_window_interval.png> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1766/sink_operator_checkpoint_duration_after_window_interval.png> > > I see that the time spent for checkpoint was actually just 5 ms & 8 ms > (checkpoint duration sync) for window & sink operators. However, End to End > Duration for checkpoint was 11m 12s for both window & sink operator. > > Is this expected behavior? If yes, do you have any suggestion to reduce the > end to end checkpoint duration? > > Please let me know if any more information is needed. > > Thanks. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |