Custom Partitioning for Keyed Streams

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom Partitioning for Keyed Streams

Martin, Nick

Have a set of stateful operators that rely on keyed state. There is substantial skew between keys (i.e. there will be 100 messages on keys A and B, and 10 messages each on keys C-J), and key selection assignment is dictated by the needs of my application such that I can’t choose keys in a way that will eliminate the skew. The skew is somewhat predictable (i.e. I know keys A and B will usually get roughly 10x as many messages as the rest) and fairly consistent on different timescales (i.e. counting the messages on each key for 30 seconds would provide a reasonably good guess as to the distribution of messages that will be received over the next 10-20 minutes).

 

The problem I’m having is that often the high volume keys (A and B in the example) end up on the same task slot and slow it down, while the low volume ones are distributed across the other operators, leaving them underloaded. I looked into the available physical partitioning functions, but it looks like that functionality is generally incompatible with keyed streams, and I need access to keyed state to do my actual processing. Is there any way I can get better load balancing while using keyed state?



Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
 Thank you. 
*********************
Reply | Threaded
Open this post in threaded view
|

Re: Custom Partitioning for Keyed Streams

Piotr Nowojski
Hi,

I don’t think it is possible to enforce scheduling of two keys to different nodes, since all of that is based on hashes.

For some cases, doing the pre-aggregation step (initial aggregation done before keyBy, which is followed by final aggregation after the keyBy) can be the solution for handling a data skew. With pre aggregation, some (most?) of the work can be distributed and be done on the source node instead of doing all of the heavy lifting on the destination node. It has not been yet merged to the Flink code, but it’s entirely a user space code, which you could copy paste (and adjust) into your project. Pull request containing pre aggregation is here:
Please pay attention at the limitations of this code (documented in the java doc).

If above code doesn’t work for you for whatever reason, you can also try to implement some custom tailored pre aggregation. Like having two keyBy steps, where in first you can artificially split A and B keys into couple of smaller ones and the second keyBy could merge/squash the results.

Piotrek

On 9 Jan 2018, at 21:55, Martin, Nick <[hidden email]> wrote:

Have a set of stateful operators that rely on keyed state. There is substantial skew between keys (i.e. there will be 100 messages on keys A and B, and 10 messages each on keys C-J), and key selection assignment is dictated by the needs of my application such that I can’t choose keys in a way that will eliminate the skew. The skew is somewhat predictable (i.e. I know keys A and B will usually get roughly 10x as many messages as the rest) and fairly consistent on different timescales (i.e. counting the messages on each key for 30 seconds would provide a reasonably good guess as to the distribution of messages that will be received over the next 10-20 minutes).
 
The problem I’m having is that often the high volume keys (A and B in the example) end up on the same task slot and slow it down, while the low volume ones are distributed across the other operators, leaving them underloaded. I looked into the available physical partitioning functions, but it looks like that functionality is generally incompatible with keyed streams, and I need access to keyed state to do my actual processing. Is there any way I can get better load balancing while using keyed state?


Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
 Thank you. 
*********************