KeyBy/Rebalance overhead?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

KeyBy/Rebalance overhead?

Komal Mariam
Hello everyone,

I want to get some insights on the KeyBy (and Rebalance) operations as according to my understanding they partition our tasks over the defined parallelism and thus should make our pipeline faster.

I am reading a topic which contains 170,000,000 pre-stored records with 11 Kafka partitions and replication factor of 1.   Hence I use .setStartFromEarliest() to read the stream.
My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and 1 job manager with 6 cores. (10 task slots per TM hence I set environment parallelism to 30).
 
There are about 10,000 object IDs hence 10,000 keys.  Right now I'm keeping the number of records fixed to get a handle on how fast they're being processed.  

When I remove keyBy, I get the same results in 39 secs as opposed to 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or below I still get the same extra overhead of 9 to 13secs. My data is mostly uniformly distributed on it's key so I can rule out skew.  Rebalance likewise has the same latency as keyBy.

 What I want to know is what may be causing this overhead? And is there any way to decrease it? 

Here's the script I'm running for testing purposes: 
--------------
DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data", new JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())

DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());

mypoints.keyBy("oID").filter(new findDistancefromPOI());

public class findDistancefromPOI extends RichFilterFunction<Point> {
    public boolean filter(Point input) throws Exception {
        Double distance = computeEuclideanDist( 16.4199  , 89.974  ,input.X(),input.Y);
         return distance > 0;
    }
}


Best Regards,
Komal
Reply | Threaded
Open this post in threaded view
|

Re: KeyBy/Rebalance overhead?

Komal Mariam
Anyone?

On Fri, 6 Dec 2019 at 19:07, Komal Mariam <[hidden email]> wrote:
Hello everyone,

I want to get some insights on the KeyBy (and Rebalance) operations as according to my understanding they partition our tasks over the defined parallelism and thus should make our pipeline faster.

I am reading a topic which contains 170,000,000 pre-stored records with 11 Kafka partitions and replication factor of 1.   Hence I use .setStartFromEarliest() to read the stream.
My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and 1 job manager with 6 cores. (10 task slots per TM hence I set environment parallelism to 30).
 
There are about 10,000 object IDs hence 10,000 keys.  Right now I'm keeping the number of records fixed to get a handle on how fast they're being processed.  

When I remove keyBy, I get the same results in 39 secs as opposed to 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or below I still get the same extra overhead of 9 to 13secs. My data is mostly uniformly distributed on it's key so I can rule out skew.  Rebalance likewise has the same latency as keyBy.

 What I want to know is what may be causing this overhead? And is there any way to decrease it? 

Here's the script I'm running for testing purposes: 
--------------
DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data", new JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())

DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());

mypoints.keyBy("oID").filter(new findDistancefromPOI());

public class findDistancefromPOI extends RichFilterFunction<Point> {
    public boolean filter(Point input) throws Exception {
        Double distance = computeEuclideanDist( 16.4199  , 89.974  ,input.X(),input.Y);
         return distance > 0;
    }
}


Best Regards,
Komal
Reply | Threaded
Open this post in threaded view
|

Re: KeyBy/Rebalance overhead?

vino yang
Hi Komal,

KeyBy(Hash Partition, logically partition) and rebalance(physical partition) are both one of the partitions been supported by Flink.[1]

Generally speaking, partitioning may cause network communication(network shuffles) costs which may cause more time cost. The example provided by you may be benefit from operator chain[2] if you remove the keyBy operation.

Best,
Vino


Komal Mariam <[hidden email]> 于2019年12月9日周一 上午9:11写道:
Anyone?

On Fri, 6 Dec 2019 at 19:07, Komal Mariam <[hidden email]> wrote:
Hello everyone,

I want to get some insights on the KeyBy (and Rebalance) operations as according to my understanding they partition our tasks over the defined parallelism and thus should make our pipeline faster.

I am reading a topic which contains 170,000,000 pre-stored records with 11 Kafka partitions and replication factor of 1.   Hence I use .setStartFromEarliest() to read the stream.
My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and 1 job manager with 6 cores. (10 task slots per TM hence I set environment parallelism to 30).
 
There are about 10,000 object IDs hence 10,000 keys.  Right now I'm keeping the number of records fixed to get a handle on how fast they're being processed.  

When I remove keyBy, I get the same results in 39 secs as opposed to 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or below I still get the same extra overhead of 9 to 13secs. My data is mostly uniformly distributed on it's key so I can rule out skew.  Rebalance likewise has the same latency as keyBy.

 What I want to know is what may be causing this overhead? And is there any way to decrease it? 

Here's the script I'm running for testing purposes: 
--------------
DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data", new JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())

DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());

mypoints.keyBy("oID").filter(new findDistancefromPOI());

public class findDistancefromPOI extends RichFilterFunction<Point> {
    public boolean filter(Point input) throws Exception {
        Double distance = computeEuclideanDist( 16.4199  , 89.974  ,input.X(),input.Y);
         return distance > 0;
    }
}


Best Regards,
Komal
Reply | Threaded
Open this post in threaded view
|

Re: KeyBy/Rebalance overhead?

Komal Mariam
Thank you [hidden email]  for the reply. I suspect keyBy will beneficial in those cases where my subsequent operators are computationally intensive. Their computation time being > than network reshuffling cost. 

Regards,
Komal

On Mon, 9 Dec 2019 at 15:23, vino yang <[hidden email]> wrote:
Hi Komal,

KeyBy(Hash Partition, logically partition) and rebalance(physical partition) are both one of the partitions been supported by Flink.[1]

Generally speaking, partitioning may cause network communication(network shuffles) costs which may cause more time cost. The example provided by you may be benefit from operator chain[2] if you remove the keyBy operation.

Best,
Vino


Komal Mariam <[hidden email]> 于2019年12月9日周一 上午9:11写道:
Anyone?

On Fri, 6 Dec 2019 at 19:07, Komal Mariam <[hidden email]> wrote:
Hello everyone,

I want to get some insights on the KeyBy (and Rebalance) operations as according to my understanding they partition our tasks over the defined parallelism and thus should make our pipeline faster.

I am reading a topic which contains 170,000,000 pre-stored records with 11 Kafka partitions and replication factor of 1.   Hence I use .setStartFromEarliest() to read the stream.
My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and 1 job manager with 6 cores. (10 task slots per TM hence I set environment parallelism to 30).
 
There are about 10,000 object IDs hence 10,000 keys.  Right now I'm keeping the number of records fixed to get a handle on how fast they're being processed.  

When I remove keyBy, I get the same results in 39 secs as opposed to 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or below I still get the same extra overhead of 9 to 13secs. My data is mostly uniformly distributed on it's key so I can rule out skew.  Rebalance likewise has the same latency as keyBy.

 What I want to know is what may be causing this overhead? And is there any way to decrease it? 

Here's the script I'm running for testing purposes: 
--------------
DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data", new JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())

DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());

mypoints.keyBy("oID").filter(new findDistancefromPOI());

public class findDistancefromPOI extends RichFilterFunction<Point> {
    public boolean filter(Point input) throws Exception {
        Double distance = computeEuclideanDist( 16.4199  , 89.974  ,input.X(),input.Y);
         return distance > 0;
    }
}


Best Regards,
Komal
Reply | Threaded
Open this post in threaded view
|

Re: KeyBy/Rebalance overhead?

vino yang
Hi Komal,

Actually, the main factor about choosing the type of the partition depends on your business logic. If you want to do some aggregation logic based on a group. You must choose KeyBy to guarantee the correctness semantics.

Best,
Vino

Komal Mariam <[hidden email]> 于2019年12月9日周一 下午5:07写道:
Thank you [hidden email]  for the reply. I suspect keyBy will beneficial in those cases where my subsequent operators are computationally intensive. Their computation time being > than network reshuffling cost. 

Regards,
Komal

On Mon, 9 Dec 2019 at 15:23, vino yang <[hidden email]> wrote:
Hi Komal,

KeyBy(Hash Partition, logically partition) and rebalance(physical partition) are both one of the partitions been supported by Flink.[1]

Generally speaking, partitioning may cause network communication(network shuffles) costs which may cause more time cost. The example provided by you may be benefit from operator chain[2] if you remove the keyBy operation.

Best,
Vino


Komal Mariam <[hidden email]> 于2019年12月9日周一 上午9:11写道:
Anyone?

On Fri, 6 Dec 2019 at 19:07, Komal Mariam <[hidden email]> wrote:
Hello everyone,

I want to get some insights on the KeyBy (and Rebalance) operations as according to my understanding they partition our tasks over the defined parallelism and thus should make our pipeline faster.

I am reading a topic which contains 170,000,000 pre-stored records with 11 Kafka partitions and replication factor of 1.   Hence I use .setStartFromEarliest() to read the stream.
My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and 1 job manager with 6 cores. (10 task slots per TM hence I set environment parallelism to 30).
 
There are about 10,000 object IDs hence 10,000 keys.  Right now I'm keeping the number of records fixed to get a handle on how fast they're being processed.  

When I remove keyBy, I get the same results in 39 secs as opposed to 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or below I still get the same extra overhead of 9 to 13secs. My data is mostly uniformly distributed on it's key so I can rule out skew.  Rebalance likewise has the same latency as keyBy.

 What I want to know is what may be causing this overhead? And is there any way to decrease it? 

Here's the script I'm running for testing purposes: 
--------------
DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data", new JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())

DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());

mypoints.keyBy("oID").filter(new findDistancefromPOI());

public class findDistancefromPOI extends RichFilterFunction<Point> {
    public boolean filter(Point input) throws Exception {
        Double distance = computeEuclideanDist( 16.4199  , 89.974  ,input.X(),input.Y);
         return distance > 0;
    }
}


Best Regards,
Komal
Reply | Threaded
Open this post in threaded view
|

Re: KeyBy/Rebalance overhead?

Arvid Heise-3
Hi Komal,

as a general rule of thumb, you want to avoid network shuffles as much as possible. As vino pointed out, you need to reshuffle, if you need to group by key. Another frequent usecase is for a rebalancing of data in case of a heavy skew. Since neither applies to you, removing the keyby is the best option.

If you want to retain it, because you may experience skew in the future, there are only a couple of things you can do. You may tinker with networking settings to have smaller/larger network buffers (smaller = less latency, larger = more throughput) [1]. Of course, you get better results if you have a faster network (running in the cloud, you can play around with different adapters). Also you could try if less/more machines are actually faster (less machines = less network traffic, more machines = more compute power).

In any case, your data volume is so low that I would probably not optimize too much. We are talking about seconds and the times may vary largely from run to run, because of the low data volume. If you want to test the throughput as a POC for a larger volume, I'd either generate a larger sample or replicate it to get more reliable numbers. In any case, try to have your final use case in mind when deciding for an option.


On Mon, Dec 9, 2019 at 10:25 AM vino yang <[hidden email]> wrote:
Hi Komal,

Actually, the main factor about choosing the type of the partition depends on your business logic. If you want to do some aggregation logic based on a group. You must choose KeyBy to guarantee the correctness semantics.

Best,
Vino

Komal Mariam <[hidden email]> 于2019年12月9日周一 下午5:07写道:
Thank you [hidden email]  for the reply. I suspect keyBy will beneficial in those cases where my subsequent operators are computationally intensive. Their computation time being > than network reshuffling cost. 

Regards,
Komal

On Mon, 9 Dec 2019 at 15:23, vino yang <[hidden email]> wrote:
Hi Komal,

KeyBy(Hash Partition, logically partition) and rebalance(physical partition) are both one of the partitions been supported by Flink.[1]

Generally speaking, partitioning may cause network communication(network shuffles) costs which may cause more time cost. The example provided by you may be benefit from operator chain[2] if you remove the keyBy operation.

Best,
Vino


Komal Mariam <[hidden email]> 于2019年12月9日周一 上午9:11写道:
Anyone?

On Fri, 6 Dec 2019 at 19:07, Komal Mariam <[hidden email]> wrote:
Hello everyone,

I want to get some insights on the KeyBy (and Rebalance) operations as according to my understanding they partition our tasks over the defined parallelism and thus should make our pipeline faster.

I am reading a topic which contains 170,000,000 pre-stored records with 11 Kafka partitions and replication factor of 1.   Hence I use .setStartFromEarliest() to read the stream.
My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and 1 job manager with 6 cores. (10 task slots per TM hence I set environment parallelism to 30).
 
There are about 10,000 object IDs hence 10,000 keys.  Right now I'm keeping the number of records fixed to get a handle on how fast they're being processed.  

When I remove keyBy, I get the same results in 39 secs as opposed to 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or below I still get the same extra overhead of 9 to 13secs. My data is mostly uniformly distributed on it's key so I can rule out skew.  Rebalance likewise has the same latency as keyBy.

 What I want to know is what may be causing this overhead? And is there any way to decrease it? 

Here's the script I'm running for testing purposes: 
--------------
DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data", new JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())

DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());

mypoints.keyBy("oID").filter(new findDistancefromPOI());

public class findDistancefromPOI extends RichFilterFunction<Point> {
    public boolean filter(Point input) throws Exception {
        Double distance = computeEuclideanDist( 16.4199  , 89.974  ,input.X(),input.Y);
         return distance > 0;
    }
}


Best Regards,
Komal
Reply | Threaded
Open this post in threaded view
|

Re: KeyBy/Rebalance overhead?

Komal Mariam
Thank you so much for the detailed reply. I understand the usage for keyBy a lot better now.  You are correct about the time variation too. We will apply different network settings and extend our datasets to check performance on different use cases. 

On Mon, 9 Dec 2019 at 20:45, Arvid Heise <[hidden email]> wrote:
Hi Komal,

as a general rule of thumb, you want to avoid network shuffles as much as possible. As vino pointed out, you need to reshuffle, if you need to group by key. Another frequent usecase is for a rebalancing of data in case of a heavy skew. Since neither applies to you, removing the keyby is the best option.

If you want to retain it, because you may experience skew in the future, there are only a couple of things you can do. You may tinker with networking settings to have smaller/larger network buffers (smaller = less latency, larger = more throughput) [1]. Of course, you get better results if you have a faster network (running in the cloud, you can play around with different adapters). Also you could try if less/more machines are actually faster (less machines = less network traffic, more machines = more compute power).

In any case, your data volume is so low that I would probably not optimize too much. We are talking about seconds and the times may vary largely from run to run, because of the low data volume. If you want to test the throughput as a POC for a larger volume, I'd either generate a larger sample or replicate it to get more reliable numbers. In any case, try to have your final use case in mind when deciding for an option.


On Mon, Dec 9, 2019 at 10:25 AM vino yang <[hidden email]> wrote:
Hi Komal,

Actually, the main factor about choosing the type of the partition depends on your business logic. If you want to do some aggregation logic based on a group. You must choose KeyBy to guarantee the correctness semantics.

Best,
Vino

Komal Mariam <[hidden email]> 于2019年12月9日周一 下午5:07写道:
Thank you [hidden email]  for the reply. I suspect keyBy will beneficial in those cases where my subsequent operators are computationally intensive. Their computation time being > than network reshuffling cost. 

Regards,
Komal

On Mon, 9 Dec 2019 at 15:23, vino yang <[hidden email]> wrote:
Hi Komal,

KeyBy(Hash Partition, logically partition) and rebalance(physical partition) are both one of the partitions been supported by Flink.[1]

Generally speaking, partitioning may cause network communication(network shuffles) costs which may cause more time cost. The example provided by you may be benefit from operator chain[2] if you remove the keyBy operation.

Best,
Vino


Komal Mariam <[hidden email]> 于2019年12月9日周一 上午9:11写道:
Anyone?

On Fri, 6 Dec 2019 at 19:07, Komal Mariam <[hidden email]> wrote:
Hello everyone,

I want to get some insights on the KeyBy (and Rebalance) operations as according to my understanding they partition our tasks over the defined parallelism and thus should make our pipeline faster.

I am reading a topic which contains 170,000,000 pre-stored records with 11 Kafka partitions and replication factor of 1.   Hence I use .setStartFromEarliest() to read the stream.
My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and 1 job manager with 6 cores. (10 task slots per TM hence I set environment parallelism to 30).
 
There are about 10,000 object IDs hence 10,000 keys.  Right now I'm keeping the number of records fixed to get a handle on how fast they're being processed.  

When I remove keyBy, I get the same results in 39 secs as opposed to 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or below I still get the same extra overhead of 9 to 13secs. My data is mostly uniformly distributed on it's key so I can rule out skew.  Rebalance likewise has the same latency as keyBy.

 What I want to know is what may be causing this overhead? And is there any way to decrease it? 

Here's the script I'm running for testing purposes: 
--------------
DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data", new JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())

DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());

mypoints.keyBy("oID").filter(new findDistancefromPOI());

public class findDistancefromPOI extends RichFilterFunction<Point> {
    public boolean filter(Point input) throws Exception {
        Double distance = computeEuclideanDist( 16.4199  , 89.974  ,input.X(),input.Y);
         return distance > 0;
    }
}


Best Regards,
Komal