KeyBy distribution across taskslots

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

KeyBy distribution across taskslots

Aggarwal, Ajay

I couldn’t find reference to it anywhere in the docs, so I thought I will ask here.

 

When I use KeyBy operator, say KeyBy (“customerId”) and some keys (i.e. customers) are way too noisy than others, is there a way to ensure that too many noisy customers do not land on the same taskslot? In general does flink attempts to keep the load balanced across different taskslots assigned to a KeyBy operator ?

 

I wouldn’t be surprised if the answer is “currently no”. Would like to know if something related is planned for future. Also would love to hear from others who ran into similar situation and how they addressed it.

 

Thanks.

 

Reply | Threaded
Open this post in threaded view
|

Re: KeyBy distribution across taskslots

Fabian Hueske-2
Hi,

The answer is in fact no.
Flink hash-partitions keys into Key Groups [1] which are uniformly assigned to tasks, i.e., a task can process more than one key group.
AFAIK, there are no plans to change this behavior.
Stefan (in CC) might be able to give more details on this.

Something that might be possible in the future is to be more clever about the key group - task assignment, e.g., taking state size or number of records into account.

Best,
Fabian


Am Mi., 27. Feb. 2019 um 17:23 Uhr schrieb Aggarwal, Ajay <[hidden email]>:

I couldn’t find reference to it anywhere in the docs, so I thought I will ask here.

 

When I use KeyBy operator, say KeyBy (“customerId”) and some keys (i.e. customers) are way too noisy than others, is there a way to ensure that too many noisy customers do not land on the same taskslot? In general does flink attempts to keep the load balanced across different taskslots assigned to a KeyBy operator ?

 

I wouldn’t be surprised if the answer is “currently no”. Would like to know if something related is planned for future. Also would love to hear from others who ran into similar situation and how they addressed it.

 

Thanks.

 

Reply | Threaded
Open this post in threaded view
|

Re: KeyBy distribution across taskslots

Yun Tang
Hi,

If you noticed that some key groups are hot and in high load, you could try to increase the total key groups number (by increase the max parallelism), but pay attention that it would cause previous checkpoint cannot be restored . With the help of this, we might let the hot key groups share some pressure to others.

If you noticed just some specific keys are really hot, you could try blink branch's local agg feature[1] in SQL by setting `sql.optimizer.agg.phase.enforcer` as `TWO_PHASE`. This feature will try to first aggregate keys locally and then send to next global aggregate node just like Hadoop's combine and reduce in some way.  Jark (in CC) might provide more information.

Best
Yun Tang




From: Fabian Hueske <[hidden email]>
Sent: Thursday, February 28, 2019 18:28
To: Aggarwal, Ajay
Cc: [hidden email]
Subject: Re: KeyBy distribution across taskslots
 
Hi,

The answer is in fact no.
Flink hash-partitions keys into Key Groups [1] which are uniformly assigned to tasks, i.e., a task can process more than one key group.
AFAIK, there are no plans to change this behavior.
Stefan (in CC) might be able to give more details on this.

Something that might be possible in the future is to be more clever about the key group - task assignment, e.g., taking state size or number of records into account.

Best,
Fabian


Am Mi., 27. Feb. 2019 um 17:23 Uhr schrieb Aggarwal, Ajay <[hidden email]>:

I couldn’t find reference to it anywhere in the docs, so I thought I will ask here.

 

When I use KeyBy operator, say KeyBy (“customerId”) and some keys (i.e. customers) are way too noisy than others, is there a way to ensure that too many noisy customers do not land on the same taskslot? In general does flink attempts to keep the load balanced across different taskslots assigned to a KeyBy operator ?

 

I wouldn’t be surprised if the answer is “currently no”. Would like to know if something related is planned for future. Also would love to hear from others who ran into similar situation and how they addressed it.

 

Thanks.

 

Reply | Threaded
Open this post in threaded view
|

Re: KeyBy distribution across taskslots

Congxian Qiu
Hi

Maybe you could add a prefix for each key, so the hot keys can distributed to many tasks.

Best, Congxian
On Feb 28, 2019, 21:16 +0800, Yun Tang <[hidden email]>, wrote:
Hi,

If you noticed that some key groups are hot and in high load, you could try to increase the total key groups number (by increase the max parallelism), but pay attention that it would cause previous checkpoint cannot be restored . With the help of this, we might let the hot key groups share some pressure to others.

If you noticed just some specific keys are really hot, you could try blink branch's local agg feature[1] in SQL by setting `sql.optimizer.agg.phase.enforcer` as `TWO_PHASE`. This feature will try to first aggregate keys locally and then send to next global aggregate node just like Hadoop's combine and reduce in some way.  Jark (in CC) might provide more information.

Best
Yun Tang




From: Fabian Hueske <[hidden email]>
Sent: Thursday, February 28, 2019 18:28
To: Aggarwal, Ajay
Cc: [hidden email]
Subject: Re: KeyBy distribution across taskslots
 
Hi,

The answer is in fact no.
Flink hash-partitions keys into Key Groups [1] which are uniformly assigned to tasks, i.e., a task can process more than one key group.
AFAIK, there are no plans to change this behavior.
Stefan (in CC) might be able to give more details on this.

Something that might be possible in the future is to be more clever about the key group - task assignment, e.g., taking state size or number of records into account.

Best,
Fabian


Am Mi., 27. Feb. 2019 um 17:23 Uhr schrieb Aggarwal, Ajay <[hidden email]>:

I couldn’t find reference to it anywhere in the docs, so I thought I will ask here.

 

When I use KeyBy operator, say KeyBy (“customerId”) and some keys (i.e. customers) are way too noisy than others, is there a way to ensure that too many noisy customers do not land on the same taskslot? In general does flink attempts to keep the load balanced across different taskslots assigned to a KeyBy operator ?

 

I wouldn’t be surprised if the answer is “currently no”. Would like to know if something related is planned for future. Also would love to hear from others who ran into similar situation and how they addressed it.

 

Thanks.