state size effects latency

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

state size effects latency

Sofer, Tovi

Hi all,

 

In our application we have a requirement to very low latency, preferably less than 5ms.

We were able to achieve this so far, but when we start increasing the state size, we see distinctive decrease in latency.

We have added MinPauseBetweenCheckpoints, and are using async snapshots.

·         Why does state size has such distinctive effect on latency? How can this effect be minimized?

·         Can the state snapshot be done using separates threads and resources in order to less effect on stream data handling?

 

 

Details:

 

Application configuration:

env.enableCheckpointing(1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

env.setStateBackend(new FsStateBackend(checkpointDirURI, true)); // use async snapshots

env.setParallelism (16) ; //running on machine with 40 cores

 

Results:

 

A.      When state size is ~20MB got latency of 0.3 ms latency for 99’th percentile

 

Latency info: (in nanos)

2017-10-26 07:26:55,030 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.6afd21aeb9b9bef41a4912b023469497.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:31919 max:13481166 mean:89492.0644 stddev:265876.0259763816 p50:68140.5 p75:82152.5 p95:146654.0499999999 p98:204671.74 p99:308958.73999999993 p999:3844154.002999794

State\checkpoint info:

 

 

 

 

B.      When state size is ~200MB latency was significantly decreased to 9 ms latency for 99’th percentile

Latency info:

2017-10-26 07:17:35,289 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.05431e7ecab1888b2792265cdc0ddf84.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:30186 max:46236470 mean:322105.7072 stddev:2060373.4782505725 p50:68979.5 p75:85780.25 p95:219882.69999999914 p98:2360171.4399999934 p99:9251766.559999945 p999:3.956163987499886E7

State\checkpoint info:

 

 

 

Thanks and regrdas,

Tovi

 

Reply | Threaded
Open this post in threaded view
|

Re: state size effects latency

Narendra Joshi

We have also faced similar issues. The only thing that happens in sync when using async snaphots is getting a persistent point in time picture which in case of rocksdb backend is making symlinks. That would linearly increase with number of files to symlink but this should be negligible. We could not find a satisfying reason for increase in latency with state size.

Best,
Narendra

Narendra Joshi

On 29 Oct 2017 15:04, "Sofer, Tovi" <[hidden email]> wrote:

Hi all,

 

In our application we have a requirement to very low latency, preferably less than 5ms.

We were able to achieve this so far, but when we start increasing the state size, we see distinctive decrease in latency.

We have added MinPauseBetweenCheckpoints, and are using async snapshots.

·         Why does state size has such distinctive effect on latency? How can this effect be minimized?

·         Can the state snapshot be done using separates threads and resources in order to less effect on stream data handling?

 

 

Details:

 

Application configuration:

env.enableCheckpointing(1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

env.setStateBackend(new FsStateBackend(checkpointDirURI, true)); // use async snapshots

env.setParallelism (16) ; //running on machine with 40 cores

 

Results:

 

A.      When state size is ~20MB got latency of 0.3 ms latency for 99’th percentile

 

Latency info: (in nanos)

2017-10-26 07:26:55,030 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.6afd21aeb9b9bef41a4912b023469497.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:31919 max:13481166 mean:89492.0644 stddev:265876.0259763816 p50:68140.5 p75:82152.5 p95:146654.0499999999 p98:204671.74 p99:308958.73999999993 p999:3844154.002999794

State\checkpoint info:

 

 

 

 

B.      When state size is ~200MB latency was significantly decreased to 9 ms latency for 99’th percentile

Latency info:

2017-10-26 07:17:35,289 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.05431e7ecab1888b2792265cdc0ddf84.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:30186 max:46236470 mean:322105.7072 stddev:2060373.4782505725 p50:68979.5 p75:85780.25 p95:219882.69999999914 p98:2360171.4399999934 p99:9251766.559999945 p999:3.956163987499886E7

State\checkpoint info:

 

 

 

Thanks and regrdas,

Tovi

 

Reply | Threaded
Open this post in threaded view
|

RE: state size effects latency

Sofer, Tovi

Thank you Joshi.

We are using currently FsStateBackend since in version 1.3 it supports async snapshots, and no RocksDB.

 

Does anyone else has feedback on this issues?

 

From: Narendra Joshi [mailto:[hidden email]]
Sent:
יום א 29 אוקטובר 2017 12:13
To: Sofer, Tovi [ICG-IT] <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: state size effects latency

 

We have also faced similar issues. The only thing that happens in sync when using async snaphots is getting a persistent point in time picture which in case of rocksdb backend is making symlinks. That would linearly increase with number of files to symlink but this should be negligible. We could not find a satisfying reason for increase in latency with state size.

Best,
Narendra

Narendra Joshi

On 29 Oct 2017 15:04, "Sofer, Tovi" <[hidden email]> wrote:

Hi all,

 

In our application we have a requirement to very low latency, preferably less than 5ms.

We were able to achieve this so far, but when we start increasing the state size, we see distinctive decrease in latency.

We have added MinPauseBetweenCheckpoints, and are using async snapshots.

·         Why does state size has such distinctive effect on latency? How can this effect be minimized?

·         Can the state snapshot be done using separates threads and resources in order to less effect on stream data handling?

 

 

Details:

 

Application configuration:

env.enableCheckpointing(1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

env.setStateBackend(new FsStateBackend(checkpointDirURI, true)); // use async snapshots

env.setParallelism (16) ; //running on machine with 40 cores

 

Results:

 

A.      When state size is ~20MB got latency of 0.3 ms latency for 99’th percentile

 

Latency info: (in nanos)

2017-10-26 07:26:55,030 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.6afd21aeb9b9bef41a4912b023469497.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:31919 max:13481166 mean:89492.0644 stddev:265876.0259763816 p50:68140.5 p75:82152.5 p95:146654.0499999999 p98:204671.74 p99:308958.73999999993 p999:3844154.002999794

State\checkpoint info:

 

cid:image001.png@01D350DC.40449520

 

 

 

B.      When state size is ~200MB latency was significantly decreased to 9 ms latency for 99’th percentile

Latency info:

2017-10-26 07:17:35,289 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.05431e7ecab1888b2792265cdc0ddf84.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:30186 max:46236470 mean:322105.7072 stddev:2060373.4782505725 p50:68979.5 p75:85780.25 p95:219882.69999999914 p98:2360171.4399999934 p99:9251766.559999945 p999:3.956163987499886E7

State\checkpoint info:

 

 

cid:image002.png@01D350DC.40449520

 

Thanks and regrdas,

Tovi

 

Reply | Threaded
Open this post in threaded view
|

Re: state size effects latency

Biplob Biswas
Hi Tovi,

This might seem a really naive question (and its neither a solution or answer to your question ) but I am trying to understand how latency is viewed. You said you achieved less than 5 ms latency and say for the 99th percentile you achieved 0.3 and 9 ms respectively, what kind of latency is this? specific operator latency? because the end to end latency is around 50ms and 370 ms. 

Was just curious how latency is seen from a different perspective, would really help me in my understanding.

Thanks a lot,
Biplob

Thanks & Regards
Biplob Biswas

On Mon, Oct 30, 2017 at 8:53 AM, Sofer, Tovi <[hidden email]> wrote:

Thank you Joshi.

We are using currently FsStateBackend since in version 1.3 it supports async snapshots, and no RocksDB.

 

Does anyone else has feedback on this issues?

 

From: Narendra Joshi [mailto:[hidden email]]
Sent:
יום א 29 אוקטובר 2017 12:13
To: Sofer, Tovi [ICG-IT] <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: state size effects latency

 

We have also faced similar issues. The only thing that happens in sync when using async snaphots is getting a persistent point in time picture which in case of rocksdb backend is making symlinks. That would linearly increase with number of files to symlink but this should be negligible. We could not find a satisfying reason for increase in latency with state size.

Best,
Narendra

Narendra Joshi

On 29 Oct 2017 15:04, "Sofer, Tovi" <[hidden email]> wrote:

Hi all,

 

In our application we have a requirement to very low latency, preferably less than 5ms.

We were able to achieve this so far, but when we start increasing the state size, we see distinctive decrease in latency.

We have added MinPauseBetweenCheckpoints, and are using async snapshots.

·         Why does state size has such distinctive effect on latency? How can this effect be minimized?

·         Can the state snapshot be done using separates threads and resources in order to less effect on stream data handling?

 

 

Details:

 

Application configuration:

env.enableCheckpointing(1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

env.setStateBackend(new FsStateBackend(checkpointDirURI, true)); // use async snapshots

env.setParallelism (16) ; //running on machine with 40 cores

 

Results:

 

A.      When state size is ~20MB got latency of 0.3 ms latency for 99’th percentile

 

Latency info: (in nanos)

2017-10-26 07:26:55,030 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.6afd21aeb9b9bef41a4912b023469497.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:31919 max:13481166 mean:89492.0644 stddev:265876.0259763816 p50:68140.5 p75:82152.5 p95:146654.0499999999 p98:204671.74 p99:308958.73999999993 p999:3844154.002999794

State\checkpoint info:

 

cid:image001.png@01D350DC.40449520

 

 

 

B.      When state size is ~200MB latency was significantly decreased to 9 ms latency for 99’th percentile

Latency info:

2017-10-26 07:17:35,289 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.05431e7ecab1888b2792265cdc0ddf84.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:30186 max:46236470 mean:322105.7072 stddev:2060373.4782505725 p50:68979.5 p75:85780.25 p95:219882.69999999914 p98:2360171.4399999934 p99:9251766.559999945 p999:3.956163987499886E7

State\checkpoint info:

 

 

cid:image002.png@01D350DC.40449520

 

Thanks and regrdas,

Tovi

 


Reply | Threaded
Open this post in threaded view
|

RE: state size effects latency

Sofer, Tovi

Hi Biplob,

 

We have created our own latency meter histogram, which contains the latency from congestion time till last operator.

This is shown in log below (99’th percentile  and mean value), and our estimations are based on it.

The latency you mentioned is from checkpoint tab- which shows checkpoint latency. It is different than record latency.

Actually we were also trying to use also LatencyMarker, but didn’t know how to get from it in s simple manner the E2E latency.

2017-10-26 07:26:55,030 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.6afd21aeb9b9bef41a4912b023469497.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:31919 max:13481166 mean:89492.0644 stddev:265876.0259763816 p50:68140.5 p75:82152.5 p95:146654.0499999999 p98:204671.74 p99:308958.73999999993 p999:3844154.002999794

Tovi

 

From: Biplob Biswas [mailto:[hidden email]]
Sent:
יום ב 30 אוקטובר 2017 11:02
To: Sofer, Tovi [ICG-IT] <[hidden email]>
Cc: Narendra Joshi <[hidden email]>; user <[hidden email]>
Subject: Re: state size effects latency

 

Hi Tovi,

 

This might seem a really naive question (and its neither a solution or answer to your question ) but I am trying to understand how latency is viewed. You said you achieved less than 5 ms latency and say for the 99th percentile you achieved 0.3 and 9 ms respectively, what kind of latency is this? specific operator latency? because the end to end latency is around 50ms and 370 ms. 

 

Was just curious how latency is seen from a different perspective, would really help me in my understanding.

 

Thanks a lot,

Biplob


Thanks & Regards
Biplob Biswas

 

On Mon, Oct 30, 2017 at 8:53 AM, Sofer, Tovi <[hidden email]> wrote:

Thank you Joshi.

We are using currently FsStateBackend since in version 1.3 it supports async snapshots, and no RocksDB.

 

Does anyone else has feedback on this issues?

 

From: Narendra Joshi [mailto:[hidden email]]
Sent:
יום א 29 אוקטובר 2017 12:13
To: Sofer, Tovi [ICG-IT] <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: state size effects latency

 

We have also faced similar issues. The only thing that happens in sync when using async snaphots is getting a persistent point in time picture which in case of rocksdb backend is making symlinks. That would linearly increase with number of files to symlink but this should be negligible. We could not find a satisfying reason for increase in latency with state size.

Best,
Narendra

Narendra Joshi

On 29 Oct 2017 15:04, "Sofer, Tovi" <[hidden email]> wrote:

Hi all,

 

In our application we have a requirement to very low latency, preferably less than 5ms.

We were able to achieve this so far, but when we start increasing the state size, we see distinctive decrease in latency.

We have added MinPauseBetweenCheckpoints, and are using async snapshots.

·         Why does state size has such distinctive effect on latency? How can this effect be minimized?

·         Can the state snapshot be done using separates threads and resources in order to less effect on stream data handling?

 

 

Details:

 

Application configuration:

env.enableCheckpointing(1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

env.setStateBackend(new FsStateBackend(checkpointDirURI, true)); // use async snapshots

env.setParallelism (16) ; //running on machine with 40 cores

 

Results:

 

A.      When state size is ~20MB got latency of 0.3 ms latency for 99’th percentile

 

Latency info: (in nanos)

2017-10-26 07:26:55,030 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.6afd21aeb9b9bef41a4912b023469497.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:31919 max:13481166 mean:89492.0644 stddev:265876.0259763816 p50:68140.5 p75:82152.5 p95:146654.0499999999 p98:204671.74 p99:308958.73999999993 p999:3844154.002999794

State\checkpoint info:

 

cid:image001.png@01D350DC.40449520

 

 

 

B.      When state size is ~200MB latency was significantly decreased to 9 ms latency for 99’th percentile

Latency info:

2017-10-26 07:17:35,289 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.05431e7ecab1888b2792265cdc0ddf84.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:30186 max:46236470 mean:322105.7072 stddev:2060373.4782505725 p50:68979.5 p75:85780.25 p95:219882.69999999914 p98:2360171.4399999934 p99:9251766.559999945 p999:3.956163987499886E7

State\checkpoint info:

 

 

cid:image002.png@01D350DC.40449520

 

Thanks and regrdas,

Tovi

 

 

Reply | Threaded
Open this post in threaded view
|

Re: state size effects latency

Stefan Richter
In reply to this post by Sofer, Tovi
Hi,

I think there are a couple of potential explanations for the increased latency. Let me point out two of the most obvious that come to my mind:

1) A state size of 20 MB sounds like something that could (completely or to a large extend) fit into some cache layer of a modern CPU, whereas 200 MB does not. Please also notice that the reported size for checkpoints can be very different from the actual size in memory, typically smaller (compactly serialized bytes vs objects, references, … on the heap). Depending on your architecture, setup, access pattern, etc. this could mean that the hot path of your code is served from the cache in one case and has to access main memory in the other. You could test this hypothesis by running more data points between 20 MB, 200 MB, and beyond. If you observe plateau areas in your latency measurements for many size ranges, with relatively sharp jumps between the plateaus, this could indicate operating within vs outside of some cache.

2) In asyc mode, the backend applies copy-on-write to track modifications that run concurrently with the checkpoint. If your checkpoints are bigger, the length of the phase in which copy-on-write has to be applied, as well as the total number of objects that can could be copied is bigger. In theory, this can introduce latency, but I don’t have numbers on this effect. However, this can easily be checked by deactivating checkpoints.

Another point is how you scale up your state? Do you introduce more state per key, or more keys without changing their state size, or a mix of both? There are obviously way more potential explanations, reaching from introduced skew to NUMA effects on your 40 core CPU. But the points above should be the most obvious candidates.

Best,
Stefan

Am 29.10.2017 um 17:34 schrieb Sofer, Tovi <[hidden email]>:

Hi all,
 
In our application we have a requirement to very low latency, preferably less than 5ms.
We were able to achieve this so far, but when we start increasing the state size, we see distinctive decrease in latency.
We have added MinPauseBetweenCheckpoints, and are using async snapshots.
·         Why does state size has such distinctive effect on latency? How can this effect be minimized?
·         Can the state snapshot be done using separates threads and resources in order to less effect on stream data handling?
 
 
Details:
 
Application configuration:
env.enableCheckpointing(1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true)); // use async snapshots
env.setParallelism (16) ; //running on machine with 40 cores
 
Results:
 
A.      When state size is ~20MB got latency of 0.3 ms latency for 99’th percentile
 
Latency info: (in nanos)
2017-10-26 07:26:55,030 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.6afd21aeb9b9bef41a4912b023469497.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:31919 max:13481166 mean:89492.0644 stddev:265876.0259763816 p50:68140.5 p75:82152.5 p95:146654.0499999999 p98:204671.74 p99:308958.73999999993 p999:3844154.002999794
State\checkpoint info:
 
<image001.png>
 
 
 
B.      When state size is ~200MB latency was significantly decreased to 9 ms latency for 99’th percentile
Latency info: 
2017-10-26 07:17:35,289 INFO  com.citi.artemis.flink.reporters.Log4JReporter - [Flink-MetricRegistry-1] localhost.taskmanager.05431e7ecab1888b2792265cdc0ddf84.Flink Streaming Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:30186 max:46236470 mean:322105.7072 stddev:2060373.4782505725 p50:68979.5 p75:85780.25 p95:219882.69999999914 p98:2360171.4399999934 p99:9251766.559999945 p999:3.956163987499886E7
State\checkpoint info:
 
 
<image002.png>
 

Thanks and regrdas,

Tovi