Lower Parallelism derives better latency

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Lower Parallelism derives better latency

liron

Hi group,

 

We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs and 256GB RAM.
There is one task manager and 24 slots were defined.

When we decrease the parallelism of the Stream graph operators(each operator has the same parallelism),

we see a consistent change in the latency, it gets better:

Test run

Parallelism

99 percentile

95 percentile

75 percentile

Mean

#1

8

4.15 ms

2.02 ms

0.22 ms

0.42 ms

#2

7

3.6 ms

1.68 ms

0.14 ms

0.34 ms

#3

6

3 ms

1.4 ms

0.13 ms

0.3 ms

#4

5

2.1 ms

0.95 ms

0.1 ms

0.22 ms

#5

4

1.5 ms

0.64 ms

0.09 ms

0.16 ms

 

This was a surprise for us, as we expected that higher parallelism will derive better latency.

Could you try to assist us to understand this behavior?
I know that when there are more threads that are involved, there is probably more serialization/deserialization, but this can't be the only reason for this behavior.

 

We have two Kafka sources, and the rest of the operators are fixed windows, flatmaps, coMappers and several KeyBys.

Except for the Kafka sources and some internal logging, there is no other I/O (i.e. we do not connect to any external DB/queue)

We use Flink 1.3.

 

 

Thanks,

Liron

 

Reply | Threaded
Open this post in threaded view
|

Re: Lower Parallelism derives better latency

Aljoscha Krettek
Hi,

How are you measuring latency? Is it latency within a Flink Job or from Kafka to Kafka? The first seems more likely but I'm still interested in the details.

Best,
Aljoscha

On 3. Jan 2018, at 08:13, Netzer, Liron <[hidden email]> wrote:

Hi group,
 
We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs and 256GB RAM.
There is one task manager and 24 slots were defined.
When we decrease the parallelism of the Stream graph operators(each operator has the same parallelism),
we see a consistent change in the latency, it gets better:
Test run
Parallelism
99 percentile
95 percentile
75 percentile
Mean
#1
8
4.15 ms
2.02 ms
0.22 ms
0.42 ms
#2
7
3.6 ms
1.68 ms
0.14 ms
0.34 ms
#3
6
3 ms
1.4 ms
0.13 ms
0.3 ms
#4
5
2.1 ms
0.95 ms
0.1 ms
0.22 ms
#5
4
1.5 ms
0.64 ms
0.09 ms
0.16 ms
 
This was a surprise for us, as we expected that higher parallelism will derive better latency.
Could you try to assist us to understand this behavior? 
I know that when there are more threads that are involved, there is probably more serialization/deserialization, but this can't be the only reason for this behavior.
 
We have two Kafka sources, and the rest of the operators are fixed windows, flatmaps, coMappers and several KeyBys. 
Except for the Kafka sources and some internal logging, there is no other I/O (i.e. we do not connect to any external DB/queue)
We use Flink 1.3.
 
 
Thanks,
Liron

Reply | Threaded
Open this post in threaded view
|

Re: Lower Parallelism derives better latency

Stefan Richter
In reply to this post by liron
Hi,

one possible explanation that I see is the following: in a shuffle, each there are input and output buffers for each parallel subtask to which data could be shuffled. Those buffers are flushed either when full or after a timeout interval. If you increase the parallelism, there are more buffers and each buffer gets a smaller fraction of the data. This, in turn, means that it takes longer until an individual buffer is full and data is emitted. The timeout interval enforces an upper bound.

Your experiments works on a very small scale, and I would not assume that this would increase latency without bounds - at least once you hit the buffer timeout interval the latency should no longer increase. You could validate this by configuring smaller buffer sizes and test how this impacts the experiment.

Best,
Stefan

Am 03.01.2018 um 08:13 schrieb Netzer, Liron <[hidden email]>:

Hi group,
 
We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs and 256GB RAM.
There is one task manager and 24 slots were defined.
When we decrease the parallelism of the Stream graph operators(each operator has the same parallelism),
we see a consistent change in the latency, it gets better:
Test run
Parallelism
99 percentile
95 percentile
75 percentile
Mean
#1
8
4.15 ms
2.02 ms
0.22 ms
0.42 ms
#2
7
3.6 ms
1.68 ms
0.14 ms
0.34 ms
#3
6
3 ms
1.4 ms
0.13 ms
0.3 ms
#4
5
2.1 ms
0.95 ms
0.1 ms
0.22 ms
#5
4
1.5 ms
0.64 ms
0.09 ms
0.16 ms
 
This was a surprise for us, as we expected that higher parallelism will derive better latency.
Could you try to assist us to understand this behavior? 
I know that when there are more threads that are involved, there is probably more serialization/deserialization, but this can't be the only reason for this behavior.
 
We have two Kafka sources, and the rest of the operators are fixed windows, flatmaps, coMappers and several KeyBys. 
Except for the Kafka sources and some internal logging, there is no other I/O (i.e. we do not connect to any external DB/queue)
We use Flink 1.3.
 
 
Thanks,
Liron

Reply | Threaded
Open this post in threaded view
|

RE: Lower Parallelism derives better latency

liron
In reply to this post by Aljoscha Krettek

Hi Aljoscha,

The latency is measured with Flink MetricGroup (specifically with "DropwizardHistogram").

The latency is measured from message read time (i.e. from when the message is pulled from Kafka source) until the last operator completes the processing(there is no Kafka sink).

 

Thanks,

Liron

 

From: Aljoscha Krettek [mailto:[hidden email]]
Sent: Wednesday, January 03, 2018 3:03 PM
To: Netzer, Liron [ICG-IT]
Cc: [hidden email]
Subject: Re: Lower Parallelism derives better latency

 

Hi,

 

How are you measuring latency? Is it latency within a Flink Job or from Kafka to Kafka? The first seems more likely but I'm still interested in the details.

 

Best,

Aljoscha



On 3. Jan 2018, at 08:13, Netzer, Liron <[hidden email]> wrote:

 

Hi group,

 

We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs and 256GB RAM.
There is one task manager and 24 slots were defined.

When we decrease the parallelism of the Stream graph operators(each operator has the same parallelism),

we see a consistent change in the latency, it gets better:

Test run

Parallelism

99 percentile

95 percentile

75 percentile

Mean

#1

8

4.15 ms

2.02 ms

0.22 ms

0.42 ms

#2

7

3.6 ms

1.68 ms

0.14 ms

0.34 ms

#3

6

3 ms

1.4 ms

0.13 ms

0.3 ms

#4

5

2.1 ms

0.95 ms

0.1 ms

0.22 ms

#5

4

1.5 ms

0.64 ms

0.09 ms

0.16 ms

 

This was a surprise for us, as we expected that higher parallelism will derive better latency.

Could you try to assist us to understand this behavior? 
I know that when there are more threads that are involved, there is probably more serialization/deserialization, but this can't be the only reason for this behavior.

 

We have two Kafka sources, and the rest of the operators are fixed windows, flatmaps, coMappers and several KeyBys. 

Except for the Kafka sources and some internal logging, there is no other I/O (i.e. we do not connect to any external DB/queue)

We use Flink 1.3.

 

 

Thanks,

Liron

 

Reply | Threaded
Open this post in threaded view
|

RE: Lower Parallelism derives better latency

liron
In reply to this post by Stefan Richter

Hi Stefan,

Thanks for replying.

All of the tests below were executed with a buffer timeout of zero:
 env.setBufferTimeout(0);

so this means that the buffers were flushed after each record.

 

Any other explanation? J

 

Thanks,

Liron

 

 

From: Stefan Richter [mailto:[hidden email]]
Sent: Wednesday, January 03, 2018 3:20 PM
To: Netzer, Liron [ICG-IT]
Cc: [hidden email]
Subject: Re: Lower Parallelism derives better latency

 

Hi,

 

one possible explanation that I see is the following: in a shuffle, each there are input and output buffers for each parallel subtask to which data could be shuffled. Those buffers are flushed either when full or after a timeout interval. If you increase the parallelism, there are more buffers and each buffer gets a smaller fraction of the data. This, in turn, means that it takes longer until an individual buffer is full and data is emitted. The timeout interval enforces an upper bound.

 

Your experiments works on a very small scale, and I would not assume that this would increase latency without bounds - at least once you hit the buffer timeout interval the latency should no longer increase. You could validate this by configuring smaller buffer sizes and test how this impacts the experiment.

 

Best,

Stefan



Am 03.01.2018 um 08:13 schrieb Netzer, Liron <[hidden email]>:

 

Hi group,

 

We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs and 256GB RAM.
There is one task manager and 24 slots were defined.

When we decrease the parallelism of the Stream graph operators(each operator has the same parallelism),

we see a consistent change in the latency, it gets better:

Test run

Parallelism

99 percentile

95 percentile

75 percentile

Mean

#1

8

4.15 ms

2.02 ms

0.22 ms

0.42 ms

#2

7

3.6 ms

1.68 ms

0.14 ms

0.34 ms

#3

6

3 ms

1.4 ms

0.13 ms

0.3 ms

#4

5

2.1 ms

0.95 ms

0.1 ms

0.22 ms

#5

4

1.5 ms

0.64 ms

0.09 ms

0.16 ms

 

This was a surprise for us, as we expected that higher parallelism will derive better latency.

Could you try to assist us to understand this behavior? 
I know that when there are more threads that are involved, there is probably more serialization/deserialization, but this can't be the only reason for this behavior.

 

We have two Kafka sources, and the rest of the operators are fixed windows, flatmaps, coMappers and several KeyBys. 

Except for the Kafka sources and some internal logging, there is no other I/O (i.e. we do not connect to any external DB/queue)

We use Flink 1.3.

 

 

Thanks,

Liron

 

Reply | Threaded
Open this post in threaded view
|

Re: Lower Parallelism derives better latency

Stefan Richter
Hi,

ok that would have been good to know, so forget about my explanation attempt :-). This makes it interesting, and at the same time I cannot come up with an „easy“ explanation. It is not even clear if the reason for this is a general problem in Flink, your setup, or caused by something that your job is doing. Two more questions: What happens to the throughput in that experiment? Does it also decrease or increase? I just want to rule out that some general overhead is introduced. Second, do you have or could you create some (minimal) code example to reproduce the problem that you could share with us (of course you can also share this in privat)? This would be very helpful!

Best,
Stefan

Am 04.01.2018 um 08:45 schrieb Netzer, Liron <[hidden email]>:

Hi Stefan,
Thanks for replying.
All of the tests below were executed with a buffer timeout of zero:
 env.setBufferTimeout(0);
so this means that the buffers were flushed after each record.
 
Any other explanation? J
 
Thanks,
Liron
 
 
From: Stefan Richter [[hidden email]] 
Sent: Wednesday, January 03, 2018 3:20 PM
To: Netzer, Liron [ICG-IT]
Cc: [hidden email]
Subject: Re: Lower Parallelism derives better latency
 
Hi,
 
one possible explanation that I see is the following: in a shuffle, each there are input and output buffers for each parallel subtask to which data could be shuffled. Those buffers are flushed either when full or after a timeout interval. If you increase the parallelism, there are more buffers and each buffer gets a smaller fraction of the data. This, in turn, means that it takes longer until an individual buffer is full and data is emitted. The timeout interval enforces an upper bound.
 
Your experiments works on a very small scale, and I would not assume that this would increase latency without bounds - at least once you hit the buffer timeout interval the latency should no longer increase. You could validate this by configuring smaller buffer sizes and test how this impacts the experiment.
 
Best,
Stefan


Am 03.01.2018 um 08:13 schrieb Netzer, Liron <[hidden email]>:
 
Hi group,
 
We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs and 256GB RAM.
There is one task manager and 24 slots were defined.
When we decrease the parallelism of the Stream graph operators(each operator has the same parallelism),
we see a consistent change in the latency, it gets better:
Test run
Parallelism
99 percentile
95 percentile
75 percentile
Mean
#1
8
4.15 ms
2.02 ms
0.22 ms
0.42 ms
#2
7
3.6 ms
1.68 ms
0.14 ms
0.34 ms
#3
6
3 ms
1.4 ms
0.13 ms
0.3 ms
#4
5
2.1 ms
0.95 ms
0.1 ms
0.22 ms
#5
4
1.5 ms
0.64 ms
0.09 ms
0.16 ms
 
This was a surprise for us, as we expected that higher parallelism will derive better latency.
Could you try to assist us to understand this behavior? 
I know that when there are more threads that are involved, there is probably more serialization/deserialization, but this can't be the only reason for this behavior.
 
We have two Kafka sources, and the rest of the operators are fixed windows, flatmaps, coMappers and several KeyBys. 
Except for the Kafka sources and some internal logging, there is no other I/O (i.e. we do not connect to any external DB/queue)
We use Flink 1.3.
 
 
Thanks,
Liron

Reply | Threaded
Open this post in threaded view
|

RE: Lower Parallelism derives better latency

liron

Hi Stefan,

The throughput was the same in all of the executions. This was well validated in each test, as this is what I also suspected that can be effected.
The throughput was ~110,000 messages per second.

Regarding the code example, this is a bit confidential, let me think what I can do and get back to you.

Am I the first one who encountered such an issue?

 

Thanks,

Liron

 

 

From: Stefan Richter [mailto:[hidden email]]
Sent: Thursday, January 04, 2018 11:15 AM
To: Netzer, Liron [ICG-IT]
Cc: [hidden email]
Subject: Re: Lower Parallelism derives better latency

 

Hi,

 

ok that would have been good to know, so forget about my explanation attempt :-). This makes it interesting, and at the same time I cannot come up with an „easy“ explanation. It is not even clear if the reason for this is a general problem in Flink, your setup, or caused by something that your job is doing. Two more questions: What happens to the throughput in that experiment? Does it also decrease or increase? I just want to rule out that some general overhead is introduced. Second, do you have or could you create some (minimal) code example to reproduce the problem that you could share with us (of course you can also share this in privat)? This would be very helpful!

 

Best,

Stefan



Am 04.01.2018 um 08:45 schrieb Netzer, Liron <[hidden email]>:

 

Hi Stefan,

Thanks for replying.

All of the tests below were executed with a buffer timeout of zero:
 env.setBufferTimeout(0);

so this means that the buffers were flushed after each record.

 

Any other explanation? J

 

Thanks,

Liron

 

 

From: Stefan Richter [[hidden email]] 
Sent: Wednesday, January 03, 2018 3:20 PM
To: Netzer, Liron [ICG-IT]
Cc: [hidden email]
Subject: Re: Lower Parallelism derives better latency

 

Hi,

 

one possible explanation that I see is the following: in a shuffle, each there are input and output buffers for each parallel subtask to which data could be shuffled. Those buffers are flushed either when full or after a timeout interval. If you increase the parallelism, there are more buffers and each buffer gets a smaller fraction of the data. This, in turn, means that it takes longer until an individual buffer is full and data is emitted. The timeout interval enforces an upper bound.

 

Your experiments works on a very small scale, and I would not assume that this would increase latency without bounds - at least once you hit the buffer timeout interval the latency should no longer increase. You could validate this by configuring smaller buffer sizes and test how this impacts the experiment.

 

Best,

Stefan




Am 03.01.2018 um 08:13 schrieb Netzer, Liron <[hidden email]>:

 

Hi group,

 

We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs and 256GB RAM.
There is one task manager and 24 slots were defined.

When we decrease the parallelism of the Stream graph operators(each operator has the same parallelism),

we see a consistent change in the latency, it gets better:

Test run

Parallelism

99 percentile

95 percentile

75 percentile

Mean

#1

8

4.15 ms

2.02 ms

0.22 ms

0.42 ms

#2

7

3.6 ms

1.68 ms

0.14 ms

0.34 ms

#3

6

3 ms

1.4 ms

0.13 ms

0.3 ms

#4

5

2.1 ms

0.95 ms

0.1 ms

0.22 ms

#5

4

1.5 ms

0.64 ms

0.09 ms

0.16 ms

 

This was a surprise for us, as we expected that higher parallelism will derive better latency.

Could you try to assist us to understand this behavior? 
I know that when there are more threads that are involved, there is probably more serialization/deserialization, but this can't be the only reason for this behavior.

 

We have two Kafka sources, and the rest of the operators are fixed windows, flatmaps, coMappers and several KeyBys. 

Except for the Kafka sources and some internal logging, there is no other I/O (i.e. we do not connect to any external DB/queue)

We use Flink 1.3.

 

 

Thanks,

Liron

 

Reply | Threaded
Open this post in threaded view
|

Re: Lower Parallelism derives better latency

Stefan Richter
Hi,

ok, throughput sounds good then, and I assume there is also no unexpected increase in CPU usage? For the code example, maybe it is possible to minimize the code (dropping all the confidential business logic, simple generator sources,…) , while still keeping the general shape of the job intact?

This is the first report for a problem like this that I am aware of. And unfortunately, there could be many factors. For example, if you are using event time, could changing the parallelism impact your watermark progression and therefor the time it takes to trigger your windows?

Best,
Stefan

Am 04.01.2018 um 10:30 schrieb Netzer, Liron <[hidden email]>:

Hi Stefan,
The throughput was the same in all of the executions. This was well validated in each test, as this is what I also suspected that can be effected. 
The throughput was ~110,000 messages per second.
Regarding the code example, this is a bit confidential, let me think what I can do and get back to you.
Am I the first one who encountered such an issue?
 
Thanks,
Liron
 
 
From: Stefan Richter [[hidden email]] 
Sent: Thursday, January 04, 2018 11:15 AM
To: Netzer, Liron [ICG-IT]
Cc: [hidden email]
Subject: Re: Lower Parallelism derives better latency
 
Hi,
 
ok that would have been good to know, so forget about my explanation attempt :-). This makes it interesting, and at the same time I cannot come up with an „easy“ explanation. It is not even clear if the reason for this is a general problem in Flink, your setup, or caused by something that your job is doing. Two more questions: What happens to the throughput in that experiment? Does it also decrease or increase? I just want to rule out that some general overhead is introduced. Second, do you have or could you create some (minimal) code example to reproduce the problem that you could share with us (of course you can also share this in privat)? This would be very helpful!
 
Best,
Stefan


Am 04.01.2018 um 08:45 schrieb Netzer, Liron <[hidden email]>:
 
Hi Stefan,
Thanks for replying.
All of the tests below were executed with a buffer timeout of zero:
 env.setBufferTimeout(0);
so this means that the buffers were flushed after each record.
 
Any other explanation? J
 
Thanks,
Liron
 
 
From: Stefan Richter [[hidden email]] 
Sent: Wednesday, January 03, 2018 3:20 PM
To: Netzer, Liron [ICG-IT]
Cc: [hidden email]
Subject: Re: Lower Parallelism derives better latency
 
Hi,
 
one possible explanation that I see is the following: in a shuffle, each there are input and output buffers for each parallel subtask to which data could be shuffled. Those buffers are flushed either when full or after a timeout interval. If you increase the parallelism, there are more buffers and each buffer gets a smaller fraction of the data. This, in turn, means that it takes longer until an individual buffer is full and data is emitted. The timeout interval enforces an upper bound.
 
Your experiments works on a very small scale, and I would not assume that this would increase latency without bounds - at least once you hit the buffer timeout interval the latency should no longer increase. You could validate this by configuring smaller buffer sizes and test how this impacts the experiment.
 
Best,
Stefan



Am 03.01.2018 um 08:13 schrieb Netzer, Liron <[hidden email]>:
 
Hi group,
 
We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs and 256GB RAM.
There is one task manager and 24 slots were defined.
When we decrease the parallelism of the Stream graph operators(each operator has the same parallelism),
we see a consistent change in the latency, it gets better:
Test run
Parallelism
99 percentile
95 percentile
75 percentile
Mean
#1
8
4.15 ms
2.02 ms
0.22 ms
0.42 ms
#2
7
3.6 ms
1.68 ms
0.14 ms
0.34 ms
#3
6
3 ms
1.4 ms
0.13 ms
0.3 ms
#4
5
2.1 ms
0.95 ms
0.1 ms
0.22 ms
#5
4
1.5 ms
0.64 ms
0.09 ms
0.16 ms
 
This was a surprise for us, as we expected that higher parallelism will derive better latency.
Could you try to assist us to understand this behavior? 
I know that when there are more threads that are involved, there is probably more serialization/deserialization, but this can't be the only reason for this behavior.
 
We have two Kafka sources, and the rest of the operators are fixed windows, flatmaps, coMappers and several KeyBys. 
Except for the Kafka sources and some internal logging, there is no other I/O (i.e. we do not connect to any external DB/queue)
We use Flink 1.3.
 
 
Thanks,
Liron

Reply | Threaded
Open this post in threaded view
|

Re: Lower Parallelism derives better latency

Stephan Ewen
Just to make sure:

  - This runs on one machine, so only local connections?


On Thu, Jan 4, 2018 at 10:47 AM, Stefan Richter <[hidden email]> wrote:
Hi,

ok, throughput sounds good then, and I assume there is also no unexpected increase in CPU usage? For the code example, maybe it is possible to minimize the code (dropping all the confidential business logic, simple generator sources,…) , while still keeping the general shape of the job intact?

This is the first report for a problem like this that I am aware of. And unfortunately, there could be many factors. For example, if you are using event time, could changing the parallelism impact your watermark progression and therefor the time it takes to trigger your windows?

Best,
Stefan


Am 04.01.2018 um 10:30 schrieb Netzer, Liron <[hidden email]>:

Hi Stefan,
The throughput was the same in all of the executions. This was well validated in each test, as this is what I also suspected that can be effected. 
The throughput was ~110,000 messages per second.
Regarding the code example, this is a bit confidential, let me think what I can do and get back to you.
Am I the first one who encountered such an issue?
 
Thanks,
Liron
 
 
From: Stefan Richter [[hidden email]] 
Sent: Thursday, January 04, 2018 11:15 AM
To: Netzer, Liron [ICG-IT]
Cc: [hidden email]
Subject: Re: Lower Parallelism derives better latency
 
Hi,
 
ok that would have been good to know, so forget about my explanation attempt :-). This makes it interesting, and at the same time I cannot come up with an „easy“ explanation. It is not even clear if the reason for this is a general problem in Flink, your setup, or caused by something that your job is doing. Two more questions: What happens to the throughput in that experiment? Does it also decrease or increase? I just want to rule out that some general overhead is introduced. Second, do you have or could you create some (minimal) code example to reproduce the problem that you could share with us (of course you can also share this in privat)? This would be very helpful!
 
Best,
Stefan


Am 04.01.2018 um 08:45 schrieb Netzer, Liron <[hidden email]>:
 
Hi Stefan,
Thanks for replying.
All of the tests below were executed with a buffer timeout of zero:
 env.setBufferTimeout(0);
so this means that the buffers were flushed after each record.
 
Any other explanation? J
 
Thanks,
Liron
 
 
From: Stefan Richter [[hidden email]] 
Sent: Wednesday, January 03, 2018 3:20 PM
To: Netzer, Liron [ICG-IT]
Cc: [hidden email]
Subject: Re: Lower Parallelism derives better latency
 
Hi,
 
one possible explanation that I see is the following: in a shuffle, each there are input and output buffers for each parallel subtask to which data could be shuffled. Those buffers are flushed either when full or after a timeout interval. If you increase the parallelism, there are more buffers and each buffer gets a smaller fraction of the data. This, in turn, means that it takes longer until an individual buffer is full and data is emitted. The timeout interval enforces an upper bound.
 
Your experiments works on a very small scale, and I would not assume that this would increase latency without bounds - at least once you hit the buffer timeout interval the latency should no longer increase. You could validate this by configuring smaller buffer sizes and test how this impacts the experiment.
 
Best,
Stefan



Am 03.01.2018 um 08:13 schrieb Netzer, Liron <[hidden email]>:
 
Hi group,
 
We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs and 256GB RAM.
There is one task manager and 24 slots were defined.
When we decrease the parallelism of the Stream graph operators(each operator has the same parallelism),
we see a consistent change in the latency, it gets better:
Test run
Parallelism
99 percentile
95 percentile
75 percentile
Mean
#1
8
4.15 ms
2.02 ms
0.22 ms
0.42 ms
#2
7
3.6 ms
1.68 ms
0.14 ms
0.34 ms
#3
6
3 ms
1.4 ms
0.13 ms
0.3 ms
#4
5
2.1 ms
0.95 ms
0.1 ms
0.22 ms
#5
4
1.5 ms
0.64 ms
0.09 ms
0.16 ms
 
This was a surprise for us, as we expected that higher parallelism will derive better latency.
Could you try to assist us to understand this behavior? 
I know that when there are more threads that are involved, there is probably more serialization/deserialization, but this can't be the only reason for this behavior.
 
We have two Kafka sources, and the rest of the operators are fixed windows, flatmaps, coMappers and several KeyBys. 
Except for the Kafka sources and some internal logging, there is no other I/O (i.e. we do not connect to any external DB/queue)
We use Flink 1.3.
 
 
Thanks,
Liron


Reply | Threaded
Open this post in threaded view
|

RE: Lower Parallelism derives better latency

liron

Hi,

Yes, one machine. Only local connections.

CPU wasn't really effected by the parallelism changes, the CPU consumption was ~28%.

I'm finding out whether I'm allowed to send the code, will update soon.

 

Thanks,

Liron

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Thursday, January 04, 2018 12:20 PM
To: Stefan Richter
Cc: Netzer, Liron [ICG-IT]; [hidden email]
Subject: Re: Lower Parallelism derives better latency

 

Just to make sure:

 

  - This runs on one machine, so only local connections?

 

 

On Thu, Jan 4, 2018 at 10:47 AM, Stefan Richter <[hidden email]> wrote:

Hi,

 

ok, throughput sounds good then, and I assume there is also no unexpected increase in CPU usage? For the code example, maybe it is possible to minimize the code (dropping all the confidential business logic, simple generator sources,…) , while still keeping the general shape of the job intact?

 

This is the first report for a problem like this that I am aware of. And unfortunately, there could be many factors. For example, if you are using event time, could changing the parallelism impact your watermark progression and therefor the time it takes to trigger your windows?

 

Best,

Stefan

 



Am 04.01.2018 um 10:30 schrieb Netzer, Liron <[hidden email]>:

 

Hi Stefan,

The throughput was the same in all of the executions. This was well validated in each test, as this is what I also suspected that can be effected. 
The throughput was ~110,000 messages per second.

Regarding the code example, this is a bit confidential, let me think what I can do and get back to you.

Am I the first one who encountered such an issue?

 

Thanks,

Liron

 

 

From: Stefan Richter [[hidden email]] 
Sent: Thursday, January 04, 2018 11:15 AM
To: Netzer, Liron [ICG-IT]
Cc: [hidden email]
Subject: Re: Lower Parallelism derives better latency

 

Hi,

 

ok that would have been good to know, so forget about my explanation attempt :-). This makes it interesting, and at the same time I cannot come up with an „easy“ explanation. It is not even clear if the reason for this is a general problem in Flink, your setup, or caused by something that your job is doing. Two more questions: What happens to the throughput in that experiment? Does it also decrease or increase? I just want to rule out that some general overhead is introduced. Second, do you have or could you create some (minimal) code example to reproduce the problem that you could share with us (of course you can also share this in privat)? This would be very helpful!

 

Best,

Stefan

 

Am 04.01.2018 um 08:45 schrieb Netzer, Liron <[hidden email]>:

 

Hi Stefan,

Thanks for replying.

All of the tests below were executed with a buffer timeout of zero:
 env.setBufferTimeout(0);

so this means that the buffers were flushed after each record.

 

Any other explanation? J

 

Thanks,

Liron

 

 

From: Stefan Richter [[hidden email]] 
Sent: Wednesday, January 03, 2018 3:20 PM
To: Netzer, Liron [ICG-IT]
Cc: [hidden email]
Subject: Re: Lower Parallelism derives better latency

 

Hi,

 

one possible explanation that I see is the following: in a shuffle, each there are input and output buffers for each parallel subtask to which data could be shuffled. Those buffers are flushed either when full or after a timeout interval. If you increase the parallelism, there are more buffers and each buffer gets a smaller fraction of the data. This, in turn, means that it takes longer until an individual buffer is full and data is emitted. The timeout interval enforces an upper bound.

 

Your experiments works on a very small scale, and I would not assume that this would increase latency without bounds - at least once you hit the buffer timeout interval the latency should no longer increase. You could validate this by configuring smaller buffer sizes and test how this impacts the experiment.

 

Best,

Stefan



Am 03.01.2018 um 08:13 schrieb Netzer, Liron <[hidden email]>:

 

Hi group,

 

We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs and 256GB RAM.
There is one task manager and 24 slots were defined.

When we decrease the parallelism of the Stream graph operators(each operator has the same parallelism),

we see a consistent change in the latency, it gets better:

Test run

Parallelism

99 percentile

95 percentile

75 percentile

Mean

#1

8

4.15 ms

2.02 ms

0.22 ms

0.42 ms

#2

7

3.6 ms

1.68 ms

0.14 ms

0.34 ms

#3

6

3 ms

1.4 ms

0.13 ms

0.3 ms

#4

5

2.1 ms

0.95 ms

0.1 ms

0.22 ms

#5

4

1.5 ms

0.64 ms

0.09 ms

0.16 ms

 

This was a surprise for us, as we expected that higher parallelism will derive better latency.

Could you try to assist us to understand this behavior? 
I know that when there are more threads that are involved, there is probably more serialization/deserialization, but this can't be the only reason for this behavior.

 

We have two Kafka sources, and the rest of the operators are fixed windows, flatmaps, coMappers and several KeyBys. 

Except for the Kafka sources and some internal logging, there is no other I/O (i.e. we do not connect to any external DB/queue)

We use Flink 1.3.

 

 

Thanks,

Liron