Sync and Async checkpoint time

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Sync and Async checkpoint time

Sofer, Tovi

Hi group,

 

In our project we are using asynchronous  FSStateBackend, and we are trying to move to distributed storage – currently S3.

When using this storage we are experiencing issues of high backpressure and high latency, in comparison of local storage.

We are trying to understand the reason, since the checkpoint is asynchronous, so it shouldn’t have such high effect.

 

We looked at checkpoint history in web, and details from log.

·         From web it seems that Sync checkpoint duration is much higher then Async duration. (again, this is only when using s3, not when using local storage)

This happens especially in window operators (tumbling windows) such as below.

·         But from log Sync time seems very short…

 

Do you have any estimation why the async write to FSStateBackend has such high effect on the stream performance?

 

Checkpoint config:

env.enableCheckpointing(60000);

env.setStateBackend(new FsStateBackend(checkpointDirURI, true));

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);

 

 

·         Checkpoint info from console:

cid:image004.png@01D399E6.3A2F69F0

 

·         Checkpoint info from log:

2018-01-30 07:33:36,416 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 12139 ms.

2018-01-30 07:33:36,418 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-83] Received acknowledge message for checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 747c4cef2841d2ab090d9ed97e0357cc.

2018-01-30 07:33:36,676 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 12396 ms.

2018-01-30 07:33:36,677 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 747c4cef2841d2ab090d9ed97e0357cc.

2018-01-30 07:33:37,347 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 13067 ms.

2018-01-30 07:33:37,349 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 747c4cef2841d2ab090d9ed97e0357cc.

2018-01-30 07:33:37,418 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 13143 ms.

2018-01-30 07:33:37,420 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 747c4cef2841d2ab090d9ed97e0357cc.

2018-01-30 07:33:37,508 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 13234 ms.

2018-01-30 07:33:37,509 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 747c4cef2841d2ab090d9ed97e0357cc.

2018-01-30 07:33:37,589 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink Task Threads] took 1 ms.

2018-01-30 07:33:37,678 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-49-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] took 13403 ms.

2018-01-30 07:33:37,680 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 6c26b698209523f6a0c77191b2bcb491 of job 747c4cef2841d2ab090d9ed97e0357cc.

2018-01-30 07:33:38,143 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-25-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-25-thread-1,5,Flink Task Threads] took 13863 ms.

 

Thanks & regards,

Tovi

 

Reply | Threaded
Open this post in threaded view
|

Re: Sync and Async checkpoint time

Stefan Richter
Hi,

this looks like the timer service is the culprit for this problem. Timers are currently not stored in the state backend, but in a separate on-heap data structure that does not support copy-on-write or async snapshots in general. Therefore, writing the timers for a snapshot is always synchronous and this explanation would also match your observation that the problem mainly affects window operators, which make heavy use of timers.

Best,
Stefan

Am 30.01.2018 um 18:17 schrieb Sofer, Tovi <[hidden email]>:

Hi group,
 
In our project we are using asynchronous  FSStateBackend, and we are trying to move to distributed storage – currently S3.
When using this storage we are experiencing issues of high backpressure and high latency, in comparison of local storage.
We are trying to understand the reason, since the checkpoint is asynchronous, so it shouldn’t have such high effect.
 
We looked at checkpoint history in web, and details from log.
·         From web it seems that Sync checkpoint duration is much higher then Async duration. (again, this is only when using s3, not when using local storage)
This happens especially in window operators (tumbling windows) such as below.
·         But from log Sync time seems very short…
 
Do you have any estimation why the async write to FSStateBackend has such high effect on the stream performance?
 
Checkpoint config:
env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
 
 
·         Checkpoint info from console:
<image004.png>
 
·         Checkpoint info from log:
2018-01-30 07:33:36,416 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 12139 ms.
2018-01-30 07:33:36,418 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-83] Received acknowledge message for checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:36,676 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] Heap backend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 12396 ms.
2018-01-30 07:33:36,677 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,347 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] Heap backend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 13067 ms.
2018-01-30 07:33:37,349 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,418 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 13143 ms.
2018-01-30 07:33:37,420 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,508 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 13234 ms.
2018-01-30 07:33:37,509 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,589 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink Task Threads] took 1 ms.
2018-01-30 07:33:37,678 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-49-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] took 13403 ms.
2018-01-30 07:33:37,680 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 6c26b698209523f6a0c77191b2bcb491 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:38,143 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-25-thread-1] Heap backend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-25-thread-1,5,Flink Task Threads] took 13863 ms.
 

Thanks & regards,

Tovi


Reply | Threaded
Open this post in threaded view
|

RE: Sync and Async checkpoint time

Sofer, Tovi

Hi Stefan,

 

Thank you for the answer.

So you mean that any window use in the stream will result in synchronous snapshotting?

When are you planning to fix this?  

And is there a workaround?

 

Thanks again,

Tovi

From: Stefan Richter [mailto:[hidden email]]
Sent:
יום ג 30 ינואר 2018 21:10
To: Sofer, Tovi [ICG-IT] <[hidden email]>
Cc: [hidden email]
Subject: Re: Sync and Async checkpoint time

 

Hi,

 

this looks like the timer service is the culprit for this problem. Timers are currently not stored in the state backend, but in a separate on-heap data structure that does not support copy-on-write or async snapshots in general. Therefore, writing the timers for a snapshot is always synchronous and this explanation would also match your observation that the problem mainly affects window operators, which make heavy use of timers.

 

Best,

Stefan



Am 30.01.2018 um 18:17 schrieb Sofer, Tovi <[hidden email]>:

 

Hi group,

 

In our project we are using asynchronous  FSStateBackend, and we are trying to move to distributed storage – currently S3.

When using this storage we are experiencing issues of high backpressure and high latency, in comparison of local storage.

We are trying to understand the reason, since the checkpoint is asynchronous, so it shouldn’t have such high effect.

 

We looked at checkpoint history in web, and details from log.

·         From web it seems that Sync checkpoint duration is much higher then Async duration. (again, this is only when using s3, not when using local storage)

This happens especially in window operators (tumbling windows) such as below.

·         But from log Sync time seems very short…

 

Do you have any estimation why the async write to FSStateBackend has such high effect on the stream performance?

 

Checkpoint config:

env.enableCheckpointing(60000);

env.setStateBackend(new FsStateBackend(checkpointDirURI, true));

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);

 

 

·         Checkpoint info from console:

<image004.png>

 

·         Checkpoint info from log:

2018-01-30 07:33:36,416 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 12139 ms.

2018-01-30 07:33:36,418 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-83] Received acknowledge message for checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 747c4cef2841d2ab090d9ed97e0357cc.

2018-01-30 07:33:36,676 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] Heap backend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 12396 ms.

2018-01-30 07:33:36,677 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 747c4cef2841d2ab090d9ed97e0357cc.

2018-01-30 07:33:37,347 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] Heap backend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 13067 ms.

2018-01-30 07:33:37,349 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 747c4cef2841d2ab090d9ed97e0357cc.

2018-01-30 07:33:37,418 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 13143 ms.

2018-01-30 07:33:37,420 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 747c4cef2841d2ab090d9ed97e0357cc.

2018-01-30 07:33:37,508 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 13234 ms.

2018-01-30 07:33:37,509 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 747c4cef2841d2ab090d9ed97e0357cc.

2018-01-30 07:33:37,589 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink Task Threads] took 1 ms.

2018-01-30 07:33:37,678 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-49-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] took 13403 ms.

2018-01-30 07:33:37,680 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 6c26b698209523f6a0c77191b2bcb491 of job 747c4cef2841d2ab090d9ed97e0357cc.

2018-01-30 07:33:38,143 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-25-thread-1] Heap backend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-25-thread-1,5,Flink Task Threads] took 13863 ms.

 

Thanks & regards,

Tovi

 

Reply | Threaded
Open this post in threaded view
|

Re: Sync and Async checkpoint time

Stefan Richter
Hi,

there is currently no workaround for this limitation if your operator uses timers, but it is pretty high on our TODO list for release 1.6.

Best,
Stefan

Am 31.01.2018 um 09:29 schrieb Sofer, Tovi <[hidden email]>:

Hi Stefan,
 
Thank you for the answer.
So you mean that any window use in the stream will result in synchronous snapshotting?
When are you planning to fix this?  
And is there a workaround?
 
Thanks again,
Tovi
From: Stefan Richter [[hidden email]] 
Sent: 
יום ג 30 ינואר 2018 21:10
To: Sofer, Tovi [ICG-IT] <[hidden email]>
Cc: [hidden email]
Subject: Re: Sync and Async checkpoint time
 
Hi,
 
this looks like the timer service is the culprit for this problem. Timers are currently not stored in the state backend, but in a separate on-heap data structure that does not support copy-on-write or async snapshots in general. Therefore, writing the timers for a snapshot is always synchronous and this explanation would also match your observation that the problem mainly affects window operators, which make heavy use of timers.
 
Best,
Stefan


Am 30.01.2018 um 18:17 schrieb Sofer, Tovi <[hidden email]>:
 
Hi group,
 
In our project we are using asynchronous  FSStateBackend, and we are trying to move to distributed storage – currently S3.
When using this storage we are experiencing issues of high backpressure and high latency, in comparison of local storage.
We are trying to understand the reason, since the checkpoint is asynchronous, so it shouldn’t have such high effect.
 
We looked at checkpoint history in web, and details from log.
·         From web it seems that Sync checkpoint duration is much higher then Async duration. (again, this is only when using s3, not when using local storage)
This happens especially in window operators (tumbling windows) such as below.
·         But from log Sync time seems very short…
 
Do you have any estimation why the async write to FSStateBackend has such high effect on the stream performance?
 
Checkpoint config:
env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
 
 
·         Checkpoint info from console:
<image004.png>
 
·         Checkpoint info from log:
2018-01-30 07:33:36,416 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 12139 ms.
2018-01-30 07:33:36,418 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-83] Received acknowledge message for checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:36,676 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] Heap backend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 12396 ms.
2018-01-30 07:33:36,677 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,347 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] Heap backend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 13067 ms.
2018-01-30 07:33:37,349 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,418 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 13143 ms.
2018-01-30 07:33:37,420 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,508 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 13234 ms.
2018-01-30 07:33:37,509 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,589 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink Task Threads] took 1 ms.
2018-01-30 07:33:37,678 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-49-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] took 13403 ms.
2018-01-30 07:33:37,680 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 6c26b698209523f6a0c77191b2bcb491 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:38,143 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-25-thread-1] Heap backend snapshot (File Stream Factory @ <a href="s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc" style="color: purple; text-decoration: underline;" class="">s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-25-thread-1,5,Flink Task Threads] took 13863 ms.
 

Thanks & regards,

Tovi