Increase in parallelism has very bad impact on performance

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Increase in parallelism has very bad impact on performance

Sidney Feiner
Hey,
I'm writing a Flink app that does some transformation on an event consumed from Kafka and then creates time windows keyed by some field, and apply an aggregation on all those events.
When I run it with parallelism 1, I get a throughput of around 1.6K events per second (so also 1.6K events per slot). With parallelism 5, that goes down to 1.2K events per slot, and when I increase the parallelism to 10, it drops to 600 events per slot.
Which means that parallelism 5 and parallelism 10, give me the same total throughput (1.2x5 = 600x10).

I noticed that although I have 3 Task Managers, all the all the tasks are run on the same machine, causing it's CPU to spike and probably, this is the reason that the throughput dramatically decreases. After increasing the parallelism to 15 and now tasks run on 2/3 machines, the average throughput per slot is still around 600.

What could cause this dramatic decrease in performance?

Extra info:
  • Flink version 1.9.2
  • Flink High Availability mode
  • 3 task managers, 66 slots total

Execution plan:


Any help would be much appreciated 🙂


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 

Reply | Threaded
Open this post in threaded view
|

Re: Increase in parallelism has very bad impact on performance

Yangze Guo
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner <[hidden email]> wrote:

>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed from Kafka and then creates time windows keyed by some field, and apply an aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events per second (so also 1.6K events per slot). With parallelism 5, that goes down to 1.2K events per slot, and when I increase the parallelism to 10, it drops to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run on the same machine, causing it's CPU to spike and probably, this is the reason that the throughput dramatically decreases. After increasing the parallelism to 15 and now tasks run on 2/3 machines, the average throughput per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Increase in parallelism has very bad impact on performance

Arvid Heise-3
Hi Sidney,

there could be a couple of reasons where scaling actually hurts. Let's include them one by one.

First, you need to make sure that your source actually supports scaling. Thus, your Kafka topic needs at least as many partitions as you want to scale. So if you want to scale at some point to 66 parallel instances. Your kafka topic must have at least 66 partitions. Ofc, you can also read from less partitions, but then some source subtasks are idling. That's valid if your downstream pipeline is much more resource intensive. Also note that it's really hard to increase the number of Kafka partitions later, so please plan accordingly.

Second, you have a Windowing operation that uses hashes. It's really important to check if the hashes are evenly distributed. So you first could have an issue that most records share the same key, but you could on top have the issue that different keys share the same hash. In these cases, most records are processed by the same subtask resulting in poor overall performance. (You can check for data skew incl. hash skew in Web UI).

Third, make sure that there is actually enough data to be processed. Does your topic contain enough data? If you want to process historic data, did you choose the correct consumer setting? Can your Kafka cluster provide enough data to the Flink job? If your max data rate is 6k records from Kafka, then ofc the per slot throughput decreases on scaling up.

Fourth, if you suspect that the clumping of used slots to one task manager may be an issue, try out the configuration cluster-evenly-spread-out-slots [1]. The basic idea is to use a TM fully first to allow easier scale-in. However, if for some reason your TM is more quickly saturated than it has slots, you may try to spread evenly. However, you may also want to check if you declare too many slots for each TM (in most cases slots = cores).



On Tue, Nov 3, 2020 at 4:01 AM Yangze Guo <[hidden email]> wrote:
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner <[hidden email]> wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed from Kafka and then creates time windows keyed by some field, and apply an aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events per second (so also 1.6K events per slot). With parallelism 5, that goes down to 1.2K events per slot, and when I increase the parallelism to 10, it drops to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run on the same machine, causing it's CPU to spike and probably, this is the reason that the throughput dramatically decreases. After increasing the parallelism to 15 and now tasks run on 2/3 machines, the average throughput per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Increase in parallelism has very bad impact on performance

Sidney Feiner
In reply to this post by Yangze Guo
Hey, I just ran a simple consumer that does nothing but consume event event (without aggregating) and every slot handles above 3K per second, and with parallelism set to 15, it succesffully handles 45K events per second

Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 


From: Yangze Guo <[hidden email]>
Sent: Tuesday, November 3, 2020 5:00 AM
To: Sidney Feiner <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner <[hidden email]> wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed from Kafka and then creates time windows keyed by some field, and apply an aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events per second (so also 1.6K events per slot). With parallelism 5, that goes down to 1.2K events per slot, and when I increase the parallelism to 10, it drops to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run on the same machine, causing it's CPU to spike and probably, this is the reason that the throughput dramatically decreases. After increasing the parallelism to 15 and now tasks run on 2/3 machines, the average throughput per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Increase in parallelism has very bad impact on performance

Sidney Feiner
In reply to this post by Arvid Heise-3
Hey 🙂

  1. I have 150 partitions in the kafka topic
  2. I'll check that soon but why doesn't the same happen when I use a smaller parallelism? If that was the reason, I'd expect the same behavior also if I had a parallelism of 5. How does the increase in parallelism, decrease the throughput per slot?
  3. When I don't use a window function, it handles around 3K+ events per second per slot, so that shouldn't be the problem
  4. Tried this one right now, and the througput remains 600 events per second per slot when parallelism is set to 15 

Out of all those options, seems like I have to investigate the 2nd one. The key is a 2-character string representing a country so I don't think it's very probable for 2 different countries to have the same hash, but I know for a fact that the number of events is not evenly distributed between countries.

But still, why does the impact in performance appear only for higher parallelism?


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 


From: Arvid Heise <[hidden email]>
Sent: Tuesday, November 3, 2020 12:09 PM
To: Yangze Guo <[hidden email]>
Cc: Sidney Feiner <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

there could be a couple of reasons where scaling actually hurts. Let's include them one by one.

First, you need to make sure that your source actually supports scaling. Thus, your Kafka topic needs at least as many partitions as you want to scale. So if you want to scale at some point to 66 parallel instances. Your kafka topic must have at least 66 partitions. Ofc, you can also read from less partitions, but then some source subtasks are idling. That's valid if your downstream pipeline is much more resource intensive. Also note that it's really hard to increase the number of Kafka partitions later, so please plan accordingly.

Second, you have a Windowing operation that uses hashes. It's really important to check if the hashes are evenly distributed. So you first could have an issue that most records share the same key, but you could on top have the issue that different keys share the same hash. In these cases, most records are processed by the same subtask resulting in poor overall performance. (You can check for data skew incl. hash skew in Web UI).

Third, make sure that there is actually enough data to be processed. Does your topic contain enough data? If you want to process historic data, did you choose the correct consumer setting? Can your Kafka cluster provide enough data to the Flink job? If your max data rate is 6k records from Kafka, then ofc the per slot throughput decreases on scaling up.

Fourth, if you suspect that the clumping of used slots to one task manager may be an issue, try out the configuration cluster-evenly-spread-out-slots [1]. The basic idea is to use a TM fully first to allow easier scale-in. However, if for some reason your TM is more quickly saturated than it has slots, you may try to spread evenly. However, you may also want to check if you declare too many slots for each TM (in most cases slots = cores).



On Tue, Nov 3, 2020 at 4:01 AM Yangze Guo <[hidden email]> wrote:
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner <[hidden email]> wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed from Kafka and then creates time windows keyed by some field, and apply an aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events per second (so also 1.6K events per slot). With parallelism 5, that goes down to 1.2K events per slot, and when I increase the parallelism to 10, it drops to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run on the same machine, causing it's CPU to spike and probably, this is the reason that the throughput dramatically decreases. After increasing the parallelism to 15 and now tasks run on 2/3 machines, the average throughput per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Increase in parallelism has very bad impact on performance

Arvid Heise-3
Hi Sidney,

you might recheck your first message. Either it's incorrectly written or you are a victim of a fallacy.

With 1 slot, you have 1.6K events per slot = 1.6K overall.
With parallelism 5, you have 1.2K events per slot, so 6K overall. That's a decent speedup.
With 10, you still have 6K overall.

So you haven't experienced any performance degradation (what your title suggests). It's rather that you hit a practical scale-up/out boundary.

Now of course, you'd like to see your system to scale beyond that 6K into the realm of 45k per second and I can assure you that it's well possible in your setup. However, we need to figure out why it's not doing it.

The most likely reason that would explain the behavior is indeed data skew. Your observation also matches it: even though you distribute your job, some slots are doing much more work than other slots.

So the first thing that you should do is to plot a histogram over country codes. What you will likely see is that 20% of all records belong to the same country (US?). That's where your practical scale-up boundary comes from. Since you group by country, there is no way to calculate it in a distributed manner. Also check in Flink Web UI which tasks bottlenecks. I'm assuming it's the window operator (or rather everything after HASH) for now.

Btw concerning hash collisions: just because you have in theory some 26^2=676 combinations in a 2-letter ASCII string, you have <200 countries = unique values. Moreover, two values with the same hash is very common as the hash is remapped to your parallelism. So if your parallelism is 5, you have only 5 hash buckets where you need to put in 40 countries on average. Let's assume you have US, CN, UK as your countries with most entries and a good hash function remapped to 5 buckets, then you have 4% probability of having them all assigned to the same bucket, but almost 60% of two of them being in the same bucket.

Nevertheless, even without collisions your scalability is limited by the largest country. That's independent of the used system and inherent to your query. So if you indeed see this data skew, then the best way is to modify the query. Possible options:
- You use a more fine-grain key (country + state). That may not be possible due to semantics.
- You use multiple aggregation steps (country + state), then country. Preaggregations are always good to have.
- You can reduce data volume by filtering before HASH. (You already have a filter, so I'm guessing it's not a valid option)
- You preaggregate per Kafka partition key before HASH.

If you absolutely cannot make the aggregations more fine-grain, you need to use machines that have strong CPU slots. (it's also no use to go beyond parallelism of 10)

I also noticed that you have several forward channels. There is usually no need for them. Task chaining is much faster. Especially if you enableObjectReuse [1].



On Tue, Nov 3, 2020 at 3:14 PM Sidney Feiner <[hidden email]> wrote:
Hey 🙂

  1. I have 150 partitions in the kafka topic
  2. I'll check that soon but why doesn't the same happen when I use a smaller parallelism? If that was the reason, I'd expect the same behavior also if I had a parallelism of 5. How does the increase in parallelism, decrease the throughput per slot?
  3. When I don't use a window function, it handles around 3K+ events per second per slot, so that shouldn't be the problem
  4. Tried this one right now, and the througput remains 600 events per second per slot when parallelism is set to 15 

Out of all those options, seems like I have to investigate the 2nd one. The key is a 2-character string representing a country so I don't think it's very probable for 2 different countries to have the same hash, but I know for a fact that the number of events is not evenly distributed between countries.

But still, why does the impact in performance appear only for higher parallelism?


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp
 
emailsignature


From: Arvid Heise <[hidden email]>
Sent: Tuesday, November 3, 2020 12:09 PM
To: Yangze Guo <[hidden email]>
Cc: Sidney Feiner <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

there could be a couple of reasons where scaling actually hurts. Let's include them one by one.

First, you need to make sure that your source actually supports scaling. Thus, your Kafka topic needs at least as many partitions as you want to scale. So if you want to scale at some point to 66 parallel instances. Your kafka topic must have at least 66 partitions. Ofc, you can also read from less partitions, but then some source subtasks are idling. That's valid if your downstream pipeline is much more resource intensive. Also note that it's really hard to increase the number of Kafka partitions later, so please plan accordingly.

Second, you have a Windowing operation that uses hashes. It's really important to check if the hashes are evenly distributed. So you first could have an issue that most records share the same key, but you could on top have the issue that different keys share the same hash. In these cases, most records are processed by the same subtask resulting in poor overall performance. (You can check for data skew incl. hash skew in Web UI).

Third, make sure that there is actually enough data to be processed. Does your topic contain enough data? If you want to process historic data, did you choose the correct consumer setting? Can your Kafka cluster provide enough data to the Flink job? If your max data rate is 6k records from Kafka, then ofc the per slot throughput decreases on scaling up.

Fourth, if you suspect that the clumping of used slots to one task manager may be an issue, try out the configuration cluster-evenly-spread-out-slots [1]. The basic idea is to use a TM fully first to allow easier scale-in. However, if for some reason your TM is more quickly saturated than it has slots, you may try to spread evenly. However, you may also want to check if you declare too many slots for each TM (in most cases slots = cores).



On Tue, Nov 3, 2020 at 4:01 AM Yangze Guo <[hidden email]> wrote:
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner <[hidden email]> wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed from Kafka and then creates time windows keyed by some field, and apply an aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events per second (so also 1.6K events per slot). With parallelism 5, that goes down to 1.2K events per slot, and when I increase the parallelism to 10, it drops to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run on the same machine, causing it's CPU to spike and probably, this is the reason that the throughput dramatically decreases. After increasing the parallelism to 15 and now tasks run on 2/3 machines, the average throughput per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Increase in parallelism has very bad impact on performance

Sidney Feiner
You're right, this is scale problem (for me that's performance).

As for what you were saying about the data skew, that could be it so I tried running the job without using keyBy and I wrote an aggregator that accumulates the events per country and then wrote a FlatMap that takes that map and returns a stream of events per country. I was hoping that that way I won't have skewing problems as all the data is actually handled in the same tasks (and I don't mind that).

But even after this change, I'm experiencing the same scaling limit. 

And I actually found something inefficient in my code and now that I've fixed it, the app seems to scale a bit better. I also decreased the time window which increased the scaling some more. 

So now I still hit a scaling limit but it seems a bit better already:
Parallelism Throughput/sec Throughput/slot/sec Increase in parallelism (%) Increase in events (%) % Of expected increase
1 2,630 2,630 - - -
15 16,340 1,180 1500% 621% 41.4%
30 22,100 736 200% 135% 67.5%
50 16,600 332 166% 75% 45%

The last column is to check how "linearly" the app actually scales. Best case scenario is 100% when the increase in parallelism is 200% and the increase in handled events increases by 200% as well. 

It is pretty clear to see that my app is far from scaling linearly, and its throughput even decreases from parallelism 30 to parallelism 50.

What could cause these weird and unstable numbers of % in expected increase even though I'm not using a KeyedWindow anymore?




Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 


From: Arvid Heise <[hidden email]>
Sent: Tuesday, November 3, 2020 8:54 PM
To: Sidney Feiner <[hidden email]>
Cc: Yangze Guo <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

you might recheck your first message. Either it's incorrectly written or you are a victim of a fallacy.

With 1 slot, you have 1.6K events per slot = 1.6K overall.
With parallelism 5, you have 1.2K events per slot, so 6K overall. That's a decent speedup.
With 10, you still have 6K overall.

So you haven't experienced any performance degradation (what your title suggests). It's rather that you hit a practical scale-up/out boundary.

Now of course, you'd like to see your system to scale beyond that 6K into the realm of 45k per second and I can assure you that it's well possible in your setup. However, we need to figure out why it's not doing it.

The most likely reason that would explain the behavior is indeed data skew. Your observation also matches it: even though you distribute your job, some slots are doing much more work than other slots.

So the first thing that you should do is to plot a histogram over country codes. What you will likely see is that 20% of all records belong to the same country (US?). That's where your practical scale-up boundary comes from. Since you group by country, there is no way to calculate it in a distributed manner. Also check in Flink Web UI which tasks bottlenecks. I'm assuming it's the window operator (or rather everything after HASH) for now.

Btw concerning hash collisions: just because you have in theory some 26^2=676 combinations in a 2-letter ASCII string, you have <200 countries = unique values. Moreover, two values with the same hash is very common as the hash is remapped to your parallelism. So if your parallelism is 5, you have only 5 hash buckets where you need to put in 40 countries on average. Let's assume you have US, CN, UK as your countries with most entries and a good hash function remapped to 5 buckets, then you have 4% probability of having them all assigned to the same bucket, but almost 60% of two of them being in the same bucket.

Nevertheless, even without collisions your scalability is limited by the largest country. That's independent of the used system and inherent to your query. So if you indeed see this data skew, then the best way is to modify the query. Possible options:
- You use a more fine-grain key (country + state). That may not be possible due to semantics.
- You use multiple aggregation steps (country + state), then country. Preaggregations are always good to have.
- You can reduce data volume by filtering before HASH. (You already have a filter, so I'm guessing it's not a valid option)
- You preaggregate per Kafka partition key before HASH.

If you absolutely cannot make the aggregations more fine-grain, you need to use machines that have strong CPU slots. (it's also no use to go beyond parallelism of 10)

I also noticed that you have several forward channels. There is usually no need for them. Task chaining is much faster. Especially if you enableObjectReuse [1].



On Tue, Nov 3, 2020 at 3:14 PM Sidney Feiner <[hidden email]> wrote:
Hey 🙂

  1. I have 150 partitions in the kafka topic
  2. I'll check that soon but why doesn't the same happen when I use a smaller parallelism? If that was the reason, I'd expect the same behavior also if I had a parallelism of 5. How does the increase in parallelism, decrease the throughput per slot?
  3. When I don't use a window function, it handles around 3K+ events per second per slot, so that shouldn't be the problem
  4. Tried this one right now, and the througput remains 600 events per second per slot when parallelism is set to 15 

Out of all those options, seems like I have to investigate the 2nd one. The key is a 2-character string representing a country so I don't think it's very probable for 2 different countries to have the same hash, but I know for a fact that the number of events is not evenly distributed between countries.

But still, why does the impact in performance appear only for higher parallelism?


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 
emailsignature


From: Arvid Heise <[hidden email]>
Sent: Tuesday, November 3, 2020 12:09 PM
To: Yangze Guo <[hidden email]>
Cc: Sidney Feiner <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

there could be a couple of reasons where scaling actually hurts. Let's include them one by one.

First, you need to make sure that your source actually supports scaling. Thus, your Kafka topic needs at least as many partitions as you want to scale. So if you want to scale at some point to 66 parallel instances. Your kafka topic must have at least 66 partitions. Ofc, you can also read from less partitions, but then some source subtasks are idling. That's valid if your downstream pipeline is much more resource intensive. Also note that it's really hard to increase the number of Kafka partitions later, so please plan accordingly.

Second, you have a Windowing operation that uses hashes. It's really important to check if the hashes are evenly distributed. So you first could have an issue that most records share the same key, but you could on top have the issue that different keys share the same hash. In these cases, most records are processed by the same subtask resulting in poor overall performance. (You can check for data skew incl. hash skew in Web UI).

Third, make sure that there is actually enough data to be processed. Does your topic contain enough data? If you want to process historic data, did you choose the correct consumer setting? Can your Kafka cluster provide enough data to the Flink job? If your max data rate is 6k records from Kafka, then ofc the per slot throughput decreases on scaling up.

Fourth, if you suspect that the clumping of used slots to one task manager may be an issue, try out the configuration cluster-evenly-spread-out-slots [1]. The basic idea is to use a TM fully first to allow easier scale-in. However, if for some reason your TM is more quickly saturated than it has slots, you may try to spread evenly. However, you may also want to check if you declare too many slots for each TM (in most cases slots = cores).



On Tue, Nov 3, 2020 at 4:01 AM Yangze Guo <[hidden email]> wrote:
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner <[hidden email]> wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed from Kafka and then creates time windows keyed by some field, and apply an aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events per second (so also 1.6K events per slot). With parallelism 5, that goes down to 1.2K events per slot, and when I increase the parallelism to 10, it drops to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run on the same machine, causing it's CPU to spike and probably, this is the reason that the throughput dramatically decreases. After increasing the parallelism to 15 and now tasks run on 2/3 machines, the average throughput per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Increase in parallelism has very bad impact on performance

Arvid Heise-3
Hi Sidney,

could you describe how your aggregation works and how your current pipeline looks like? Is the aggregation partially applied before shuffling the data? I'm a bit lost on how aggregation without keyby looks like.

A decrease in throughput may also be a result of more overhead and less available memory. It also depends on how long you wait with your measurements after starting (as more parallelism = slower start). The way you measure may greatly influence the result and might explain the fluctuation.

Also how does your slot distribution now look like?

Btw from your description, it still sounds like a big country may slow down the overall process. So a histogram over the countries would be very helpful.

On Wed, Nov 4, 2020 at 12:01 PM Sidney Feiner <[hidden email]> wrote:
You're right, this is scale problem (for me that's performance).

As for what you were saying about the data skew, that could be it so I tried running the job without using keyBy and I wrote an aggregator that accumulates the events per country and then wrote a FlatMap that takes that map and returns a stream of events per country. I was hoping that that way I won't have skewing problems as all the data is actually handled in the same tasks (and I don't mind that).

But even after this change, I'm experiencing the same scaling limit. 

And I actually found something inefficient in my code and now that I've fixed it, the app seems to scale a bit better. I also decreased the time window which increased the scaling some more. 

So now I still hit a scaling limit but it seems a bit better already:
Parallelism Throughput/sec Throughput/slot/sec Increase in parallelism (%) Increase in events (%) % Of expected increase
1 2,630 2,630 - - -
15 16,340 1,180 1500% 621% 41.4%
30 22,100 736 200% 135% 67.5%
50 16,600 332 166% 75% 45%

The last column is to check how "linearly" the app actually scales. Best case scenario is 100% when the increase in parallelism is 200% and the increase in handled events increases by 200% as well. 

It is pretty clear to see that my app is far from scaling linearly, and its throughput even decreases from parallelism 30 to parallelism 50.

What could cause these weird and unstable numbers of % in expected increase even though I'm not using a KeyedWindow anymore?




Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp
 
emailsignature


From: Arvid Heise <[hidden email]>
Sent: Tuesday, November 3, 2020 8:54 PM
To: Sidney Feiner <[hidden email]>
Cc: Yangze Guo <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

you might recheck your first message. Either it's incorrectly written or you are a victim of a fallacy.

With 1 slot, you have 1.6K events per slot = 1.6K overall.
With parallelism 5, you have 1.2K events per slot, so 6K overall. That's a decent speedup.
With 10, you still have 6K overall.

So you haven't experienced any performance degradation (what your title suggests). It's rather that you hit a practical scale-up/out boundary.

Now of course, you'd like to see your system to scale beyond that 6K into the realm of 45k per second and I can assure you that it's well possible in your setup. However, we need to figure out why it's not doing it.

The most likely reason that would explain the behavior is indeed data skew. Your observation also matches it: even though you distribute your job, some slots are doing much more work than other slots.

So the first thing that you should do is to plot a histogram over country codes. What you will likely see is that 20% of all records belong to the same country (US?). That's where your practical scale-up boundary comes from. Since you group by country, there is no way to calculate it in a distributed manner. Also check in Flink Web UI which tasks bottlenecks. I'm assuming it's the window operator (or rather everything after HASH) for now.

Btw concerning hash collisions: just because you have in theory some 26^2=676 combinations in a 2-letter ASCII string, you have <200 countries = unique values. Moreover, two values with the same hash is very common as the hash is remapped to your parallelism. So if your parallelism is 5, you have only 5 hash buckets where you need to put in 40 countries on average. Let's assume you have US, CN, UK as your countries with most entries and a good hash function remapped to 5 buckets, then you have 4% probability of having them all assigned to the same bucket, but almost 60% of two of them being in the same bucket.

Nevertheless, even without collisions your scalability is limited by the largest country. That's independent of the used system and inherent to your query. So if you indeed see this data skew, then the best way is to modify the query. Possible options:
- You use a more fine-grain key (country + state). That may not be possible due to semantics.
- You use multiple aggregation steps (country + state), then country. Preaggregations are always good to have.
- You can reduce data volume by filtering before HASH. (You already have a filter, so I'm guessing it's not a valid option)
- You preaggregate per Kafka partition key before HASH.

If you absolutely cannot make the aggregations more fine-grain, you need to use machines that have strong CPU slots. (it's also no use to go beyond parallelism of 10)

I also noticed that you have several forward channels. There is usually no need for them. Task chaining is much faster. Especially if you enableObjectReuse [1].



On Tue, Nov 3, 2020 at 3:14 PM Sidney Feiner <[hidden email]> wrote:
Hey 🙂

  1. I have 150 partitions in the kafka topic
  2. I'll check that soon but why doesn't the same happen when I use a smaller parallelism? If that was the reason, I'd expect the same behavior also if I had a parallelism of 5. How does the increase in parallelism, decrease the throughput per slot?
  3. When I don't use a window function, it handles around 3K+ events per second per slot, so that shouldn't be the problem
  4. Tried this one right now, and the througput remains 600 events per second per slot when parallelism is set to 15 

Out of all those options, seems like I have to investigate the 2nd one. The key is a 2-character string representing a country so I don't think it's very probable for 2 different countries to have the same hash, but I know for a fact that the number of events is not evenly distributed between countries.

But still, why does the impact in performance appear only for higher parallelism?


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp
 
emailsignature


From: Arvid Heise <[hidden email]>
Sent: Tuesday, November 3, 2020 12:09 PM
To: Yangze Guo <[hidden email]>
Cc: Sidney Feiner <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

there could be a couple of reasons where scaling actually hurts. Let's include them one by one.

First, you need to make sure that your source actually supports scaling. Thus, your Kafka topic needs at least as many partitions as you want to scale. So if you want to scale at some point to 66 parallel instances. Your kafka topic must have at least 66 partitions. Ofc, you can also read from less partitions, but then some source subtasks are idling. That's valid if your downstream pipeline is much more resource intensive. Also note that it's really hard to increase the number of Kafka partitions later, so please plan accordingly.

Second, you have a Windowing operation that uses hashes. It's really important to check if the hashes are evenly distributed. So you first could have an issue that most records share the same key, but you could on top have the issue that different keys share the same hash. In these cases, most records are processed by the same subtask resulting in poor overall performance. (You can check for data skew incl. hash skew in Web UI).

Third, make sure that there is actually enough data to be processed. Does your topic contain enough data? If you want to process historic data, did you choose the correct consumer setting? Can your Kafka cluster provide enough data to the Flink job? If your max data rate is 6k records from Kafka, then ofc the per slot throughput decreases on scaling up.

Fourth, if you suspect that the clumping of used slots to one task manager may be an issue, try out the configuration cluster-evenly-spread-out-slots [1]. The basic idea is to use a TM fully first to allow easier scale-in. However, if for some reason your TM is more quickly saturated than it has slots, you may try to spread evenly. However, you may also want to check if you declare too many slots for each TM (in most cases slots = cores).



On Tue, Nov 3, 2020 at 4:01 AM Yangze Guo <[hidden email]> wrote:
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner <[hidden email]> wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed from Kafka and then creates time windows keyed by some field, and apply an aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events per second (so also 1.6K events per slot). With parallelism 5, that goes down to 1.2K events per slot, and when I increase the parallelism to 10, it drops to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run on the same machine, causing it's CPU to spike and probably, this is the reason that the throughput dramatically decreases. After increasing the parallelism to 15 and now tasks run on 2/3 machines, the average throughput per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Increase in parallelism has very bad impact on performance

Sidney Feiner
I've changed it again and now I use a key again but to randomly distribute the events, I defined the key to be one of the events' key hashCode modulo parallelism (hoping this would ensure that my events be evenly distributed to all my slots).

To simplify the explanation, assume we work with a "Family" object with the following fields:
  • country
  • familyId
  • members (Array of name)

So, this is the general flow:
  • Event is read by Kafka and Mapped into a custom object relevant to our pipeline
  • DataStream is then applied keyBy(event.hashCode() % parallelism)
  • Events are aggregated by an AggregateFunction that does the following:
    • Create an entry in a map per country
    • Per country, hold an array with lists of families
      • Every list contains up to 3 families (in reality, much more)
      • Example (using JSON to simplify writing down the Java/Scala objects): 
    • Pass that through a FlatMap function that returns a collection of tuples with the country and a family group.
      • Example (using JSON to simplify writing down the Java/Scala objects
    • For each of those tuples, country is translated to some URL and then the URL and list of Family objects are passed to the Sink (Rest API sink)

So basically, the aggregator consumes events, batches them per country and maximum size and then sends them to a Rest API sink.

Some more behaviors that I've noticed:
  • When I keyed by country, throughput wasn't amazing but I noticed that if I was using the Rest API sink, or if I used some dummy sink that did nothing, performance was more or less the same
  • Now that I've evenly distributed the events, throuhput with dummy sink is much better than before, BUT when I do use the Rest API Sink, suddenly the performance drops drastically (with parallelism 15 it drops from 30K events per second to 1000 events per second)

Another theoretical question I had that through your previous responses, made me understand I badly understand the keying concept:
  • At first, I thought that events were consumed independently in every slot and then every slot created its own windows for every key and processed them independently
  • From your answers I can see that that's not true. Keying actually shuffles the data so that all the events with the same key, are grouped together and processed together in a single slot.
So how does the flow actually look like? If I have 15 slots, do they independently consume events, then shuffle them based on the key, and then those 15 slots stop consuming events and instead process our windows and once they finish processing the windows, they consume again?
Does that mean then if I every use the .windowAll function that doesn't use any key, that I will only have a single slot processing all the data and thus lose all the point in parallelism?

And BTW, I really feel like I'm getting closer to the solution and I really seriously appreciate your help!


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 


From: Arvid Heise <[hidden email]>
Sent: Wednesday, November 4, 2020 2:18 PM
To: Sidney Feiner <[hidden email]>
Cc: Yangze Guo <[hidden email]>; [hidden email] <[hidden email]>; Eyal Pe'er <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

could you describe how your aggregation works and how your current pipeline looks like? Is the aggregation partially applied before shuffling the data? I'm a bit lost on how aggregation without keyby looks like.

A decrease in throughput may also be a result of more overhead and less available memory. It also depends on how long you wait with your measurements after starting (as more parallelism = slower start). The way you measure may greatly influence the result and might explain the fluctuation.

Also how does your slot distribution now look like?

Btw from your description, it still sounds like a big country may slow down the overall process. So a histogram over the countries would be very helpful.

On Wed, Nov 4, 2020 at 12:01 PM Sidney Feiner <[hidden email]> wrote:
You're right, this is scale problem (for me that's performance).

As for what you were saying about the data skew, that could be it so I tried running the job without using keyBy and I wrote an aggregator that accumulates the events per country and then wrote a FlatMap that takes that map and returns a stream of events per country. I was hoping that that way I won't have skewing problems as all the data is actually handled in the same tasks (and I don't mind that).

But even after this change, I'm experiencing the same scaling limit. 

And I actually found something inefficient in my code and now that I've fixed it, the app seems to scale a bit better. I also decreased the time window which increased the scaling some more. 

So now I still hit a scaling limit but it seems a bit better already:
Parallelism Throughput/sec Throughput/slot/sec Increase in parallelism (%) Increase in events (%) % Of expected increase
1 2,630 2,630 - - -
15 16,340 1,180 1500% 621% 41.4%
30 22,100 736 200% 135% 67.5%
50 16,600 332 166% 75% 45%

The last column is to check how "linearly" the app actually scales. Best case scenario is 100% when the increase in parallelism is 200% and the increase in handled events increases by 200% as well. 

It is pretty clear to see that my app is far from scaling linearly, and its throughput even decreases from parallelism 30 to parallelism 50.

What could cause these weird and unstable numbers of % in expected increase even though I'm not using a KeyedWindow anymore?




Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 
emailsignature


From: Arvid Heise <[hidden email]>
Sent: Tuesday, November 3, 2020 8:54 PM
To: Sidney Feiner <[hidden email]>
Cc: Yangze Guo <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

you might recheck your first message. Either it's incorrectly written or you are a victim of a fallacy.

With 1 slot, you have 1.6K events per slot = 1.6K overall.
With parallelism 5, you have 1.2K events per slot, so 6K overall. That's a decent speedup.
With 10, you still have 6K overall.

So you haven't experienced any performance degradation (what your title suggests). It's rather that you hit a practical scale-up/out boundary.

Now of course, you'd like to see your system to scale beyond that 6K into the realm of 45k per second and I can assure you that it's well possible in your setup. However, we need to figure out why it's not doing it.

The most likely reason that would explain the behavior is indeed data skew. Your observation also matches it: even though you distribute your job, some slots are doing much more work than other slots.

So the first thing that you should do is to plot a histogram over country codes. What you will likely see is that 20% of all records belong to the same country (US?). That's where your practical scale-up boundary comes from. Since you group by country, there is no way to calculate it in a distributed manner. Also check in Flink Web UI which tasks bottlenecks. I'm assuming it's the window operator (or rather everything after HASH) for now.

Btw concerning hash collisions: just because you have in theory some 26^2=676 combinations in a 2-letter ASCII string, you have <200 countries = unique values. Moreover, two values with the same hash is very common as the hash is remapped to your parallelism. So if your parallelism is 5, you have only 5 hash buckets where you need to put in 40 countries on average. Let's assume you have US, CN, UK as your countries with most entries and a good hash function remapped to 5 buckets, then you have 4% probability of having them all assigned to the same bucket, but almost 60% of two of them being in the same bucket.

Nevertheless, even without collisions your scalability is limited by the largest country. That's independent of the used system and inherent to your query. So if you indeed see this data skew, then the best way is to modify the query. Possible options:
- You use a more fine-grain key (country + state). That may not be possible due to semantics.
- You use multiple aggregation steps (country + state), then country. Preaggregations are always good to have.
- You can reduce data volume by filtering before HASH. (You already have a filter, so I'm guessing it's not a valid option)
- You preaggregate per Kafka partition key before HASH.

If you absolutely cannot make the aggregations more fine-grain, you need to use machines that have strong CPU slots. (it's also no use to go beyond parallelism of 10)

I also noticed that you have several forward channels. There is usually no need for them. Task chaining is much faster. Especially if you enableObjectReuse [1].



On Tue, Nov 3, 2020 at 3:14 PM Sidney Feiner <[hidden email]> wrote:
Hey 🙂

  1. I have 150 partitions in the kafka topic
  2. I'll check that soon but why doesn't the same happen when I use a smaller parallelism? If that was the reason, I'd expect the same behavior also if I had a parallelism of 5. How does the increase in parallelism, decrease the throughput per slot?
  3. When I don't use a window function, it handles around 3K+ events per second per slot, so that shouldn't be the problem
  4. Tried this one right now, and the througput remains 600 events per second per slot when parallelism is set to 15 

Out of all those options, seems like I have to investigate the 2nd one. The key is a 2-character string representing a country so I don't think it's very probable for 2 different countries to have the same hash, but I know for a fact that the number of events is not evenly distributed between countries.

But still, why does the impact in performance appear only for higher parallelism?


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 
emailsignature


From: Arvid Heise <[hidden email]>
Sent: Tuesday, November 3, 2020 12:09 PM
To: Yangze Guo <[hidden email]>
Cc: Sidney Feiner <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

there could be a couple of reasons where scaling actually hurts. Let's include them one by one.

First, you need to make sure that your source actually supports scaling. Thus, your Kafka topic needs at least as many partitions as you want to scale. So if you want to scale at some point to 66 parallel instances. Your kafka topic must have at least 66 partitions. Ofc, you can also read from less partitions, but then some source subtasks are idling. That's valid if your downstream pipeline is much more resource intensive. Also note that it's really hard to increase the number of Kafka partitions later, so please plan accordingly.

Second, you have a Windowing operation that uses hashes. It's really important to check if the hashes are evenly distributed. So you first could have an issue that most records share the same key, but you could on top have the issue that different keys share the same hash. In these cases, most records are processed by the same subtask resulting in poor overall performance. (You can check for data skew incl. hash skew in Web UI).

Third, make sure that there is actually enough data to be processed. Does your topic contain enough data? If you want to process historic data, did you choose the correct consumer setting? Can your Kafka cluster provide enough data to the Flink job? If your max data rate is 6k records from Kafka, then ofc the per slot throughput decreases on scaling up.

Fourth, if you suspect that the clumping of used slots to one task manager may be an issue, try out the configuration cluster-evenly-spread-out-slots [1]. The basic idea is to use a TM fully first to allow easier scale-in. However, if for some reason your TM is more quickly saturated than it has slots, you may try to spread evenly. However, you may also want to check if you declare too many slots for each TM (in most cases slots = cores).



On Tue, Nov 3, 2020 at 4:01 AM Yangze Guo <[hidden email]> wrote:
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner <[hidden email]> wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed from Kafka and then creates time windows keyed by some field, and apply an aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events per second (so also 1.6K events per slot). With parallelism 5, that goes down to 1.2K events per slot, and when I increase the parallelism to 10, it drops to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run on the same machine, causing it's CPU to spike and probably, this is the reason that the throughput dramatically decreases. After increasing the parallelism to 15 and now tasks run on 2/3 machines, the average throughput per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Increase in parallelism has very bad impact on performance

Arvid Heise-3
Hi Sidney,

I'm also feeling as if we are getting closer. I see 3 potential improvements now: better serialization, getting rid of flatMap, async IO

1)
* How are you serializing the data in Flink? Avoid relying on Kryo at all cost [1].
* The easiest way for me is to go full Avro in Kafka source, Flink, Kafka sink with confluent's schema registry. I can elaborate on that if needed.

2) Could you please try the following pipeline:
* Read data and map into object
* Aggregate by (country, familyId)
* Aggregate by country (only if really needed)

Your use case actually looks like you can use the fine-grain key (country, familyId) to avoid data skew more or less entirely. You might even be able to skip the last aggregation from your last version of the pipeline.

3) Which sink are you using? It may be a better option to use async IO instead of a custom sink [2] (you don't need to have a sink in Flink). Also check in a standalone program on how good your REST endpoint scales. If your REST service can handle only 1k events/second, then all other optimizations are moot. We can also optimize on that in a follow-up mail.




On Thu, Nov 5, 2020 at 12:28 AM Sidney Feiner <[hidden email]> wrote:
I've changed it again and now I use a key again but to randomly distribute the events, I defined the key to be one of the events' key hashCode modulo parallelism (hoping this would ensure that my events be evenly distributed to all my slots).

To simplify the explanation, assume we work with a "Family" object with the following fields:
  • country
  • familyId
  • members (Array of name)

So, this is the general flow:
  • Event is read by Kafka and Mapped into a custom object relevant to our pipeline
  • DataStream is then applied keyBy(event.hashCode() % parallelism)
  • Events are aggregated by an AggregateFunction that does the following:
    • Create an entry in a map per country
    • Per country, hold an array with lists of families
      • Every list contains up to 3 families (in reality, much more)
      • Example (using JSON to simplify writing down the Java/Scala objects): 
    • Pass that through a FlatMap function that returns a collection of tuples with the country and a family group.
      • Example (using JSON to simplify writing down the Java/Scala objects
    • For each of those tuples, country is translated to some URL and then the URL and list of Family objects are passed to the Sink (Rest API sink)

So basically, the aggregator consumes events, batches them per country and maximum size and then sends them to a Rest API sink.

Some more behaviors that I've noticed:
  • When I keyed by country, throughput wasn't amazing but I noticed that if I was using the Rest API sink, or if I used some dummy sink that did nothing, performance was more or less the same
  • Now that I've evenly distributed the events, throuhput with dummy sink is much better than before, BUT when I do use the Rest API Sink, suddenly the performance drops drastically (with parallelism 15 it drops from 30K events per second to 1000 events per second)

Another theoretical question I had that through your previous responses, made me understand I badly understand the keying concept:
  • At first, I thought that events were consumed independently in every slot and then every slot created its own windows for every key and processed them independently
  • From your answers I can see that that's not true. Keying actually shuffles the data so that all the events with the same key, are grouped together and processed together in a single slot.
So how does the flow actually look like? If I have 15 slots, do they independently consume events, then shuffle them based on the key, and then those 15 slots stop consuming events and instead process our windows and once they finish processing the windows, they consume again?
Does that mean then if I every use the .windowAll function that doesn't use any key, that I will only have a single slot processing all the data and thus lose all the point in parallelism?

And BTW, I really feel like I'm getting closer to the solution and I really seriously appreciate your help!


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp
 
emailsignature


From: Arvid Heise <[hidden email]>
Sent: Wednesday, November 4, 2020 2:18 PM
To: Sidney Feiner <[hidden email]>
Cc: Yangze Guo <[hidden email]>; [hidden email] <[hidden email]>; Eyal Pe'er <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

could you describe how your aggregation works and how your current pipeline looks like? Is the aggregation partially applied before shuffling the data? I'm a bit lost on how aggregation without keyby looks like.

A decrease in throughput may also be a result of more overhead and less available memory. It also depends on how long you wait with your measurements after starting (as more parallelism = slower start). The way you measure may greatly influence the result and might explain the fluctuation.

Also how does your slot distribution now look like?

Btw from your description, it still sounds like a big country may slow down the overall process. So a histogram over the countries would be very helpful.

On Wed, Nov 4, 2020 at 12:01 PM Sidney Feiner <[hidden email]> wrote:
You're right, this is scale problem (for me that's performance).

As for what you were saying about the data skew, that could be it so I tried running the job without using keyBy and I wrote an aggregator that accumulates the events per country and then wrote a FlatMap that takes that map and returns a stream of events per country. I was hoping that that way I won't have skewing problems as all the data is actually handled in the same tasks (and I don't mind that).

But even after this change, I'm experiencing the same scaling limit. 

And I actually found something inefficient in my code and now that I've fixed it, the app seems to scale a bit better. I also decreased the time window which increased the scaling some more. 

So now I still hit a scaling limit but it seems a bit better already:
Parallelism Throughput/sec Throughput/slot/sec Increase in parallelism (%) Increase in events (%) % Of expected increase
1 2,630 2,630 - - -
15 16,340 1,180 1500% 621% 41.4%
30 22,100 736 200% 135% 67.5%
50 16,600 332 166% 75% 45%

The last column is to check how "linearly" the app actually scales. Best case scenario is 100% when the increase in parallelism is 200% and the increase in handled events increases by 200% as well. 

It is pretty clear to see that my app is far from scaling linearly, and its throughput even decreases from parallelism 30 to parallelism 50.

What could cause these weird and unstable numbers of % in expected increase even though I'm not using a KeyedWindow anymore?




Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp
 
emailsignature


From: Arvid Heise <[hidden email]>
Sent: Tuesday, November 3, 2020 8:54 PM
To: Sidney Feiner <[hidden email]>
Cc: Yangze Guo <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

you might recheck your first message. Either it's incorrectly written or you are a victim of a fallacy.

With 1 slot, you have 1.6K events per slot = 1.6K overall.
With parallelism 5, you have 1.2K events per slot, so 6K overall. That's a decent speedup.
With 10, you still have 6K overall.

So you haven't experienced any performance degradation (what your title suggests). It's rather that you hit a practical scale-up/out boundary.

Now of course, you'd like to see your system to scale beyond that 6K into the realm of 45k per second and I can assure you that it's well possible in your setup. However, we need to figure out why it's not doing it.

The most likely reason that would explain the behavior is indeed data skew. Your observation also matches it: even though you distribute your job, some slots are doing much more work than other slots.

So the first thing that you should do is to plot a histogram over country codes. What you will likely see is that 20% of all records belong to the same country (US?). That's where your practical scale-up boundary comes from. Since you group by country, there is no way to calculate it in a distributed manner. Also check in Flink Web UI which tasks bottlenecks. I'm assuming it's the window operator (or rather everything after HASH) for now.

Btw concerning hash collisions: just because you have in theory some 26^2=676 combinations in a 2-letter ASCII string, you have <200 countries = unique values. Moreover, two values with the same hash is very common as the hash is remapped to your parallelism. So if your parallelism is 5, you have only 5 hash buckets where you need to put in 40 countries on average. Let's assume you have US, CN, UK as your countries with most entries and a good hash function remapped to 5 buckets, then you have 4% probability of having them all assigned to the same bucket, but almost 60% of two of them being in the same bucket.

Nevertheless, even without collisions your scalability is limited by the largest country. That's independent of the used system and inherent to your query. So if you indeed see this data skew, then the best way is to modify the query. Possible options:
- You use a more fine-grain key (country + state). That may not be possible due to semantics.
- You use multiple aggregation steps (country + state), then country. Preaggregations are always good to have.
- You can reduce data volume by filtering before HASH. (You already have a filter, so I'm guessing it's not a valid option)
- You preaggregate per Kafka partition key before HASH.

If you absolutely cannot make the aggregations more fine-grain, you need to use machines that have strong CPU slots. (it's also no use to go beyond parallelism of 10)

I also noticed that you have several forward channels. There is usually no need for them. Task chaining is much faster. Especially if you enableObjectReuse [1].



On Tue, Nov 3, 2020 at 3:14 PM Sidney Feiner <[hidden email]> wrote:
Hey 🙂

  1. I have 150 partitions in the kafka topic
  2. I'll check that soon but why doesn't the same happen when I use a smaller parallelism? If that was the reason, I'd expect the same behavior also if I had a parallelism of 5. How does the increase in parallelism, decrease the throughput per slot?
  3. When I don't use a window function, it handles around 3K+ events per second per slot, so that shouldn't be the problem
  4. Tried this one right now, and the througput remains 600 events per second per slot when parallelism is set to 15 

Out of all those options, seems like I have to investigate the 2nd one. The key is a 2-character string representing a country so I don't think it's very probable for 2 different countries to have the same hash, but I know for a fact that the number of events is not evenly distributed between countries.

But still, why does the impact in performance appear only for higher parallelism?


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp
 
emailsignature


From: Arvid Heise <[hidden email]>
Sent: Tuesday, November 3, 2020 12:09 PM
To: Yangze Guo <[hidden email]>
Cc: Sidney Feiner <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

there could be a couple of reasons where scaling actually hurts. Let's include them one by one.

First, you need to make sure that your source actually supports scaling. Thus, your Kafka topic needs at least as many partitions as you want to scale. So if you want to scale at some point to 66 parallel instances. Your kafka topic must have at least 66 partitions. Ofc, you can also read from less partitions, but then some source subtasks are idling. That's valid if your downstream pipeline is much more resource intensive. Also note that it's really hard to increase the number of Kafka partitions later, so please plan accordingly.

Second, you have a Windowing operation that uses hashes. It's really important to check if the hashes are evenly distributed. So you first could have an issue that most records share the same key, but you could on top have the issue that different keys share the same hash. In these cases, most records are processed by the same subtask resulting in poor overall performance. (You can check for data skew incl. hash skew in Web UI).

Third, make sure that there is actually enough data to be processed. Does your topic contain enough data? If you want to process historic data, did you choose the correct consumer setting? Can your Kafka cluster provide enough data to the Flink job? If your max data rate is 6k records from Kafka, then ofc the per slot throughput decreases on scaling up.

Fourth, if you suspect that the clumping of used slots to one task manager may be an issue, try out the configuration cluster-evenly-spread-out-slots [1]. The basic idea is to use a TM fully first to allow easier scale-in. However, if for some reason your TM is more quickly saturated than it has slots, you may try to spread evenly. However, you may also want to check if you declare too many slots for each TM (in most cases slots = cores).



On Tue, Nov 3, 2020 at 4:01 AM Yangze Guo <[hidden email]> wrote:
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner <[hidden email]> wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed from Kafka and then creates time windows keyed by some field, and apply an aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events per second (so also 1.6K events per slot). With parallelism 5, that goes down to 1.2K events per slot, and when I increase the parallelism to 10, it drops to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run on the same machine, causing it's CPU to spike and probably, this is the reason that the throughput dramatically decreases. After increasing the parallelism to 15 and now tasks run on 2/3 machines, the average throughput per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Increase in parallelism has very bad impact on performance

Sidney Feiner
  1. We have a custom serializer. Events are produced with ProtoBuff serialization and then we deserialize them into a generic event we work with.
    1. Seems like the current bottlenecks are aggregation and sink, so for now, I'll leave this part alone
  2. when you say familyId, do you mean familyId % parallelism? Because every family can just have a single event and that event already has all its members as a list in it. So, no aggregation per family is needed. The point is to batch many families into a single array.
  3. Using a custom sink that uses Play's StandaloneAhcWSClient to send GET/POST requests to endpoint
    1. currently, this is working synchronously (we wait for the response) but we've been thinking of making it async but for that we first need to create some metric to see how many requests failed in some time window to know if we want to throw an exception and kill the pipeline

Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 


From: Arvid Heise <[hidden email]>
Sent: Thursday, November 5, 2020 9:32 AM
To: Sidney Feiner <[hidden email]>
Cc: Yangze Guo <[hidden email]>; [hidden email] <[hidden email]>; Eyal Pe'er <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

I'm also feeling as if we are getting closer. I see 3 potential improvements now: better serialization, getting rid of flatMap, async IO

1)
* How are you serializing the data in Flink? Avoid relying on Kryo at all cost [1].
* The easiest way for me is to go full Avro in Kafka source, Flink, Kafka sink with confluent's schema registry. I can elaborate on that if needed.

2) Could you please try the following pipeline:
* Read data and map into object
* Aggregate by (country, familyId)
* Aggregate by country (only if really needed)

Your use case actually looks like you can use the fine-grain key (country, familyId) to avoid data skew more or less entirely. You might even be able to skip the last aggregation from your last version of the pipeline.

3) Which sink are you using? It may be a better option to use async IO instead of a custom sink [2] (you don't need to have a sink in Flink). Also check in a standalone program on how good your REST endpoint scales. If your REST service can handle only 1k events/second, then all other optimizations are moot. We can also optimize on that in a follow-up mail.




On Thu, Nov 5, 2020 at 12:28 AM Sidney Feiner <[hidden email]> wrote:
I've changed it again and now I use a key again but to randomly distribute the events, I defined the key to be one of the events' key hashCode modulo parallelism (hoping this would ensure that my events be evenly distributed to all my slots).

To simplify the explanation, assume we work with a "Family" object with the following fields:
  • country
  • familyId
  • members (Array of name)

So, this is the general flow:
  • Event is read by Kafka and Mapped into a custom object relevant to our pipeline
  • DataStream is then applied keyBy(event.hashCode() % parallelism)
  • Events are aggregated by an AggregateFunction that does the following:
    • Create an entry in a map per country
    • Per country, hold an array with lists of families
      • Every list contains up to 3 families (in reality, much more)
      • Example (using JSON to simplify writing down the Java/Scala objects): 
    • Pass that through a FlatMap function that returns a collection of tuples with the country and a family group.
      • Example (using JSON to simplify writing down the Java/Scala objects
    • For each of those tuples, country is translated to some URL and then the URL and list of Family objects are passed to the Sink (Rest API sink)

So basically, the aggregator consumes events, batches them per country and maximum size and then sends them to a Rest API sink.

Some more behaviors that I've noticed:
  • When I keyed by country, throughput wasn't amazing but I noticed that if I was using the Rest API sink, or if I used some dummy sink that did nothing, performance was more or less the same
  • Now that I've evenly distributed the events, throuhput with dummy sink is much better than before, BUT when I do use the Rest API Sink, suddenly the performance drops drastically (with parallelism 15 it drops from 30K events per second to 1000 events per second)

Another theoretical question I had that through your previous responses, made me understand I badly understand the keying concept:
  • At first, I thought that events were consumed independently in every slot and then every slot created its own windows for every key and processed them independently
  • From your answers I can see that that's not true. Keying actually shuffles the data so that all the events with the same key, are grouped together and processed together in a single slot.
So how does the flow actually look like? If I have 15 slots, do they independently consume events, then shuffle them based on the key, and then those 15 slots stop consuming events and instead process our windows and once they finish processing the windows, they consume again?
Does that mean then if I every use the .windowAll function that doesn't use any key, that I will only have a single slot processing all the data and thus lose all the point in parallelism?

And BTW, I really feel like I'm getting closer to the solution and I really seriously appreciate your help!


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 
emailsignature


From: Arvid Heise <[hidden email]>
Sent: Wednesday, November 4, 2020 2:18 PM
To: Sidney Feiner <[hidden email]>
Cc: Yangze Guo <[hidden email]>; [hidden email] <[hidden email]>; Eyal Pe'er <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

could you describe how your aggregation works and how your current pipeline looks like? Is the aggregation partially applied before shuffling the data? I'm a bit lost on how aggregation without keyby looks like.

A decrease in throughput may also be a result of more overhead and less available memory. It also depends on how long you wait with your measurements after starting (as more parallelism = slower start). The way you measure may greatly influence the result and might explain the fluctuation.

Also how does your slot distribution now look like?

Btw from your description, it still sounds like a big country may slow down the overall process. So a histogram over the countries would be very helpful.

On Wed, Nov 4, 2020 at 12:01 PM Sidney Feiner <[hidden email]> wrote:
You're right, this is scale problem (for me that's performance).

As for what you were saying about the data skew, that could be it so I tried running the job without using keyBy and I wrote an aggregator that accumulates the events per country and then wrote a FlatMap that takes that map and returns a stream of events per country. I was hoping that that way I won't have skewing problems as all the data is actually handled in the same tasks (and I don't mind that).

But even after this change, I'm experiencing the same scaling limit. 

And I actually found something inefficient in my code and now that I've fixed it, the app seems to scale a bit better. I also decreased the time window which increased the scaling some more. 

So now I still hit a scaling limit but it seems a bit better already:
Parallelism Throughput/sec Throughput/slot/sec Increase in parallelism (%) Increase in events (%) % Of expected increase
1 2,630 2,630 - - -
15 16,340 1,180 1500% 621% 41.4%
30 22,100 736 200% 135% 67.5%
50 16,600 332 166% 75% 45%

The last column is to check how "linearly" the app actually scales. Best case scenario is 100% when the increase in parallelism is 200% and the increase in handled events increases by 200% as well. 

It is pretty clear to see that my app is far from scaling linearly, and its throughput even decreases from parallelism 30 to parallelism 50.

What could cause these weird and unstable numbers of % in expected increase even though I'm not using a KeyedWindow anymore?




Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 
emailsignature


From: Arvid Heise <[hidden email]>
Sent: Tuesday, November 3, 2020 8:54 PM
To: Sidney Feiner <[hidden email]>
Cc: Yangze Guo <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

you might recheck your first message. Either it's incorrectly written or you are a victim of a fallacy.

With 1 slot, you have 1.6K events per slot = 1.6K overall.
With parallelism 5, you have 1.2K events per slot, so 6K overall. That's a decent speedup.
With 10, you still have 6K overall.

So you haven't experienced any performance degradation (what your title suggests). It's rather that you hit a practical scale-up/out boundary.

Now of course, you'd like to see your system to scale beyond that 6K into the realm of 45k per second and I can assure you that it's well possible in your setup. However, we need to figure out why it's not doing it.

The most likely reason that would explain the behavior is indeed data skew. Your observation also matches it: even though you distribute your job, some slots are doing much more work than other slots.

So the first thing that you should do is to plot a histogram over country codes. What you will likely see is that 20% of all records belong to the same country (US?). That's where your practical scale-up boundary comes from. Since you group by country, there is no way to calculate it in a distributed manner. Also check in Flink Web UI which tasks bottlenecks. I'm assuming it's the window operator (or rather everything after HASH) for now.

Btw concerning hash collisions: just because you have in theory some 26^2=676 combinations in a 2-letter ASCII string, you have <200 countries = unique values. Moreover, two values with the same hash is very common as the hash is remapped to your parallelism. So if your parallelism is 5, you have only 5 hash buckets where you need to put in 40 countries on average. Let's assume you have US, CN, UK as your countries with most entries and a good hash function remapped to 5 buckets, then you have 4% probability of having them all assigned to the same bucket, but almost 60% of two of them being in the same bucket.

Nevertheless, even without collisions your scalability is limited by the largest country. That's independent of the used system and inherent to your query. So if you indeed see this data skew, then the best way is to modify the query. Possible options:
- You use a more fine-grain key (country + state). That may not be possible due to semantics.
- You use multiple aggregation steps (country + state), then country. Preaggregations are always good to have.
- You can reduce data volume by filtering before HASH. (You already have a filter, so I'm guessing it's not a valid option)
- You preaggregate per Kafka partition key before HASH.

If you absolutely cannot make the aggregations more fine-grain, you need to use machines that have strong CPU slots. (it's also no use to go beyond parallelism of 10)

I also noticed that you have several forward channels. There is usually no need for them. Task chaining is much faster. Especially if you enableObjectReuse [1].



On Tue, Nov 3, 2020 at 3:14 PM Sidney Feiner <[hidden email]> wrote:
Hey 🙂

  1. I have 150 partitions in the kafka topic
  2. I'll check that soon but why doesn't the same happen when I use a smaller parallelism? If that was the reason, I'd expect the same behavior also if I had a parallelism of 5. How does the increase in parallelism, decrease the throughput per slot?
  3. When I don't use a window function, it handles around 3K+ events per second per slot, so that shouldn't be the problem
  4. Tried this one right now, and the througput remains 600 events per second per slot when parallelism is set to 15 

Out of all those options, seems like I have to investigate the 2nd one. The key is a 2-character string representing a country so I don't think it's very probable for 2 different countries to have the same hash, but I know for a fact that the number of events is not evenly distributed between countries.

But still, why does the impact in performance appear only for higher parallelism?


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 
emailsignature


From: Arvid Heise <[hidden email]>
Sent: Tuesday, November 3, 2020 12:09 PM
To: Yangze Guo <[hidden email]>
Cc: Sidney Feiner <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Increase in parallelism has very bad impact on performance
 
Hi Sidney,

there could be a couple of reasons where scaling actually hurts. Let's include them one by one.

First, you need to make sure that your source actually supports scaling. Thus, your Kafka topic needs at least as many partitions as you want to scale. So if you want to scale at some point to 66 parallel instances. Your kafka topic must have at least 66 partitions. Ofc, you can also read from less partitions, but then some source subtasks are idling. That's valid if your downstream pipeline is much more resource intensive. Also note that it's really hard to increase the number of Kafka partitions later, so please plan accordingly.

Second, you have a Windowing operation that uses hashes. It's really important to check if the hashes are evenly distributed. So you first could have an issue that most records share the same key, but you could on top have the issue that different keys share the same hash. In these cases, most records are processed by the same subtask resulting in poor overall performance. (You can check for data skew incl. hash skew in Web UI).

Third, make sure that there is actually enough data to be processed. Does your topic contain enough data? If you want to process historic data, did you choose the correct consumer setting? Can your Kafka cluster provide enough data to the Flink job? If your max data rate is 6k records from Kafka, then ofc the per slot throughput decreases on scaling up.

Fourth, if you suspect that the clumping of used slots to one task manager may be an issue, try out the configuration cluster-evenly-spread-out-slots [1]. The basic idea is to use a TM fully first to allow easier scale-in. However, if for some reason your TM is more quickly saturated than it has slots, you may try to spread evenly. However, you may also want to check if you declare too many slots for each TM (in most cases slots = cores).



On Tue, Nov 3, 2020 at 4:01 AM Yangze Guo <[hidden email]> wrote:
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner <[hidden email]> wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed from Kafka and then creates time windows keyed by some field, and apply an aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events per second (so also 1.6K events per slot). With parallelism 5, that goes down to 1.2K events per slot, and when I increase the parallelism to 10, it drops to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run on the same machine, causing it's CPU to spike and probably, this is the reason that the throughput dramatically decreases. After increasing the parallelism to 15 and now tasks run on 2/3 machines, the average throughput per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng