Performance issues with GroupBy?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Performance issues with GroupBy?

Paschek, Robert

Hi Mailing List,

 

i actually do some benchmarks with different algorithms. The System has 8 nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop MapReduce, the execution mode is set to “BATCH_FORCED”

 

It is suspicious, that three of the six algorithms had a big gap in runtime (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally the algorithms in the “upper” group using a groupBy transformation and the algorithms in the “lower” group don’t use groupBy.

I attached the plot for better visualization.

 

I also checked the logs, especially the time, when the mappers finishing and the reducers start _iterating_ - they hardened my speculation.

 

So my question is, if it is “normal”, that grouping is so cost-intensive that – in my case – the runtime increases by 4 times?

I have data from the same experiments running on a 13 nodes cluster with 26 cores with Apache Hadoop MapReduce, where the gap is still present, but smaller (50s vs 57s or 55s vs 65s).

 

Is there something I might could do to optimize the grouping? Some codesnipplets:

 

The Job:
DataSet<?> output = input

                        .mapPartition(new MR_GPMRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER")

                        .groupBy(0)

                        .reduceGroup(new MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData, "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER");

 

MR_GPMRS_Mapper():

public class MR_GPMRS_Mapper <T extends Tuple> extends RichMapPartitionFunction<T, Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>, BitSet, BitSet>>>

 

MR_GPMRS_Reducer():

public class MR_GPMRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>, BitSet, BitSet>>, T>

 

The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the Integer Key for grouping.

 

Any suggestions (or comments, that it is a “normal” behaviour) are welcome : - )

 

Thank you in advance!

Robert


indep.pdf (291K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Performance issues with GroupBy?

Gábor Gévay
Hello Robert,

> Is there something I might could do to optimize the grouping?

You can try to make your `RichGroupReduceFunction` implement the
`GroupCombineFunction` interface, so that Flink can do combining
before the shuffle, which might significantly reduce the network load.
(How much the combiner helps the performance can greatly depend on how
large are your groups on average.)

Alternatively, if you can reformulate your algorithm to use a `reduce`
instead of a `reduceGroup` that might also improve the performance.
Also, if you are using a `reduce`, then you can try calling
`.setCombineHint(CombineHint.HASH)` after the reduce. (The combine
hint is a relatively new feature, so you need the current master for
this.)

Best,
Gábor



2016-07-25 14:06 GMT+02:00 Paschek, Robert <[hidden email]>:

> Hi Mailing List,
>
>
>
> i actually do some benchmarks with different algorithms. The System has 8
> nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if
> somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop
> MapReduce, the execution mode is set to “BATCH_FORCED”
>
>
>
> It is suspicious, that three of the six algorithms had a big gap in runtime
> (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally the algorithms
> in the “upper” group using a groupBy transformation and the algorithms in
> the “lower” group don’t use groupBy.
>
> I attached the plot for better visualization.
>
>
>
> I also checked the logs, especially the time, when the mappers finishing and
> the reducers start _iterating_ - they hardened my speculation.
>
>
>
> So my question is, if it is “normal”, that grouping is so cost-intensive
> that – in my case – the runtime increases by 4 times?
>
> I have data from the same experiments running on a 13 nodes cluster with 26
> cores with Apache Hadoop MapReduce, where the gap is still present, but
> smaller (50s vs 57s or 55s vs 65s).
>
>
>
> Is there something I might could do to optimize the grouping? Some
> codesnipplets:
>
>
>
> The Job:
> DataSet<?> output = input
>
>                         .mapPartition(new
> MR_GPMRS_Mapper()).withBroadcastSet(metaData,
> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER")
>
>                         .groupBy(0)
>
>                         .reduceGroup(new
> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData,
> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER");
>
>
>
> MR_GPMRS_Mapper():
>
> public class MR_GPMRS_Mapper <T extends Tuple> extends
> RichMapPartitionFunction<T, Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,
> BitSet, BitSet>>>
>
>
>
> MR_GPMRS_Reducer():
>
> public class MR_GPMRS_Reducer <T extends Tuple> extends
> RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,
> BitSet, BitSet>>, T>
>
>
>
> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the
> Integer Key for grouping.
>
>
>
> Any suggestions (or comments, that it is a “normal” behaviour) are welcome :
> - )
>
>
>
> Thank you in advance!
>
> Robert
Reply | Threaded
Open this post in threaded view
|

Re: Performance issues with GroupBy?

Ufuk Celebi
+1 to what Gavor said. The hash combine will be part of the upcoming
1.1. release, too.

This could be further amplified by the blocking intermediate results,
which have a very simplistic implementation writing out many different
files, which can lead to a lot of random I/O.

– Ufuk

On Tue, Jul 26, 2016 at 11:41 AM, Gábor Gévay <[hidden email]> wrote:

> Hello Robert,
>
>> Is there something I might could do to optimize the grouping?
>
> You can try to make your `RichGroupReduceFunction` implement the
> `GroupCombineFunction` interface, so that Flink can do combining
> before the shuffle, which might significantly reduce the network load.
> (How much the combiner helps the performance can greatly depend on how
> large are your groups on average.)
>
> Alternatively, if you can reformulate your algorithm to use a `reduce`
> instead of a `reduceGroup` that might also improve the performance.
> Also, if you are using a `reduce`, then you can try calling
> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine
> hint is a relatively new feature, so you need the current master for
> this.)
>
> Best,
> Gábor
>
>
>
> 2016-07-25 14:06 GMT+02:00 Paschek, Robert <[hidden email]>:
>> Hi Mailing List,
>>
>>
>>
>> i actually do some benchmarks with different algorithms. The System has 8
>> nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if
>> somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop
>> MapReduce, the execution mode is set to “BATCH_FORCED”
>>
>>
>>
>> It is suspicious, that three of the six algorithms had a big gap in runtime
>> (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally the algorithms
>> in the “upper” group using a groupBy transformation and the algorithms in
>> the “lower” group don’t use groupBy.
>>
>> I attached the plot for better visualization.
>>
>>
>>
>> I also checked the logs, especially the time, when the mappers finishing and
>> the reducers start _iterating_ - they hardened my speculation.
>>
>>
>>
>> So my question is, if it is “normal”, that grouping is so cost-intensive
>> that – in my case – the runtime increases by 4 times?
>>
>> I have data from the same experiments running on a 13 nodes cluster with 26
>> cores with Apache Hadoop MapReduce, where the gap is still present, but
>> smaller (50s vs 57s or 55s vs 65s).
>>
>>
>>
>> Is there something I might could do to optimize the grouping? Some
>> codesnipplets:
>>
>>
>>
>> The Job:
>> DataSet<?> output = input
>>
>>                         .mapPartition(new
>> MR_GPMRS_Mapper()).withBroadcastSet(metaData,
>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER")
>>
>>                         .groupBy(0)
>>
>>                         .reduceGroup(new
>> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData,
>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER");
>>
>>
>>
>> MR_GPMRS_Mapper():
>>
>> public class MR_GPMRS_Mapper <T extends Tuple> extends
>> RichMapPartitionFunction<T, Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,
>> BitSet, BitSet>>>
>>
>>
>>
>> MR_GPMRS_Reducer():
>>
>> public class MR_GPMRS_Reducer <T extends Tuple> extends
>> RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,
>> BitSet, BitSet>>, T>
>>
>>
>>
>> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the
>> Integer Key for grouping.
>>
>>
>>
>> Any suggestions (or comments, that it is a “normal” behaviour) are welcome :
>> - )
>>
>>
>>
>> Thank you in advance!
>>
>> Robert
Reply | Threaded
Open this post in threaded view
|

Re: Performance issues with GroupBy?

Greg Hogan
In reply to this post by Paschek, Robert
Hi Robert,

Are you able to simplify the your function input / output types? Flink aggressively serializes the data stream and complex types such as ArrayList and BitSet will be much slower to process. Are you able to reconstruct the lists to be groupings on elements?

Greg

On Mon, Jul 25, 2016 at 8:06 AM, Paschek, Robert <[hidden email]> wrote:

Hi Mailing List,

 

i actually do some benchmarks with different algorithms. The System has 8 nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop MapReduce, the execution mode is set to “BATCH_FORCED”

 

It is suspicious, that three of the six algorithms had a big gap in runtime (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally the algorithms in the “upper” group using a groupBy transformation and the algorithms in the “lower” group don’t use groupBy.

I attached the plot for better visualization.

 

I also checked the logs, especially the time, when the mappers finishing and the reducers start _iterating_ - they hardened my speculation.

 

So my question is, if it is “normal”, that grouping is so cost-intensive that – in my case – the runtime increases by 4 times?

I have data from the same experiments running on a 13 nodes cluster with 26 cores with Apache Hadoop MapReduce, where the gap is still present, but smaller (50s vs 57s or 55s vs 65s).

 

Is there something I might could do to optimize the grouping? Some codesnipplets:

 

The Job:
DataSet<?> output = input

                        .mapPartition(new MR_GPMRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER")

                        .groupBy(0)

                        .reduceGroup(new MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData, "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER");

 

MR_GPMRS_Mapper():

public class MR_GPMRS_Mapper <T extends Tuple> extends RichMapPartitionFunction<T, Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>, BitSet, BitSet>>>

 

MR_GPMRS_Reducer():

public class MR_GPMRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>, BitSet, BitSet>>, T>

 

The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the Integer Key for grouping.

 

Any suggestions (or comments, that it is a “normal” behaviour) are welcome : - )

 

Thank you in advance!

Robert


Reply | Threaded
Open this post in threaded view
|

AW: Performance issues with GroupBy?

Paschek, Robert

Hi Gábor, hi Ufuk, hi Greg,

 

thank you for your very helpful responses!

 

> You can try to make your `RichGroupReduceFunction` implement the

> `GroupCombineFunction` interface, so that Flink can do combining

> before the shuffle, which might significantly reduce the network load.

> (How much the combiner helps the performance can greatly depend on how

> large are your groups on average.)


While implementing my reducers I didn’t thought, that combining is applicable, ‘cause each Mapper will produce each key only one time. I didn’t think of the factor, that some mappers running on the same machine and therefore will benefit from precombining before shuffling

After I implemented the combiner mentioned here

https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html#combinable-groupreducefunctions

the difference between the runtime of these algorithms with and without a combiner decreased. Thank you for the hint!

 

On curios thing: My Reducer receives BroadcastVariables and access them in the open() Method.

When the combine() Method is called, the BroadCast Variable seems not set yet: I got an explicit error messages within the open() method. Is this a potential bug? Using Apache Flink 1.0.3.

 

To avoid changing my reducers, I’am wondering, if I should implement a GroupCombineFunction independent from the reducer instead:

https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html#groupcombine-on-a-grouped-dataset

 

As far as I understand, this will work similar?

-          The following transformation - especially the groupBy() - will run first on each local Machine:

     combinedWords = input

                 .groupBy(0)

                 .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>())

 

-          And then, the following transformation will shuffle the data within the 2nd groupBy() over the network:

     output = combinedWords
          .groupBy(0);                            
          .reduceGroup(new GroupReduceFunction())

 

 

> Alternatively, if you can reformulate your algorithm to use a `reduce`

> instead of a `reduceGroup` that might also improve the performance.

> Also, if you are using a `reduce`, then you can try calling

> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine

> hint is a relatively new feature, so you need the current master for

> this.)

I have to iterate through each tuple multiple time and the final result can only be emitted after the last tuple is processed, so I think, I can’t use a reduce.

 

> This could be further amplified by the blocking intermediate results, which have a very simplistic implementation writing out many different files, which can lead to a lot of random I/O.

Thank you for theses technical explanation. I will mentioned it in my evaluation!

 

> Are you able to simplify the your function input / output types? Flink aggressively serializes the data stream and complex types such as ArrayList and BitSet will be much slower to process. Are you able to reconstruct the lists to be groupings on elements?
For my intention, to simulate an Apache Hadoop MapReduce like behaviour, I would say that my current implementation fits.
I will think about rewriting the code after the first Benchmarks to potential reveal advantages of Apache Flink in comparison for Hadoop MapReduce for the algorithms I implemented.

 

Thanks again!

Robert

 

 

Von: Greg Hogan [[hidden email]]
Gesendet: Dienstag, 26.
Juli 2016 18:57
An:
[hidden email]
Betreff: Re: Performance issues with GroupBy?

 

Hi Robert,

Are you able to simplify the your function input / output types? Flink aggressively serializes the data stream and complex types such as ArrayList and BitSet will be much slower to process. Are you able to reconstruct the lists to be groupings on elements?

Greg

 

-----Ursprüngliche Nachricht-----
Von: Ufuk Celebi [[hidden email]]
Gesendet: Dienstag, 26.
Juli 2016 11:53
An: [hidden email]
Betreff: Re: Performance issues with GroupBy?

 

+1 to what Gavor said. The hash combine will be part of the upcoming

1.1. release, too.

 

This could be further amplified by the blocking intermediate results, which have a very simplistic implementation writing out many different files, which can lead to a lot of random I/O.

 

– Ufuk

 

On Tue, Jul 26, 2016 at 11:41 AM, Gábor Gévay <[hidden email]> wrote:

> Hello Robert,

> 

>> Is there something I might could do to optimize the grouping?

> 

> You can try to make your `RichGroupReduceFunction` implement the

> `GroupCombineFunction` interface, so that Flink can do combining

> before the shuffle, which might significantly reduce the network load.

> (How much the combiner helps the performance can greatly depend on how

> large are your groups on average.)

> 

> Alternatively, if you can reformulate your algorithm to use a `reduce`

> instead of a `reduceGroup` that might also improve the performance.

> Also, if you are using a `reduce`, then you can try calling

> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine

> hint is a relatively new feature, so you need the current master for

> this.)

> 

> Best,

> Gábor

> 

> 

> 

> 2016-07-25 14:06 GMT+02:00 Paschek, Robert <[hidden email]>:

>> Hi Mailing List,

>> 

>> 

>> 

>> i actually do some benchmarks with different algorithms. The System

>> has 8 nodes and a configured parallelism of 48 - The IBM-Power-1

>> cluster, if somebody from the TU Berlin read this : - ) – and to

>> “simulate” Hadoop MapReduce, the execution mode is set to “BATCH_FORCED”

>> 

>> 

>> 

>> It is suspicious, that three of the six algorithms had a big gap in

>> runtime (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally

>> the algorithms in the “upper” group using a groupBy transformation

>> and the algorithms in the “lower” group don’t use groupBy.

>> 

>> I attached the plot for better visualization.

>> 

>> 

>> 

>> I also checked the logs, especially the time, when the mappers

>> finishing and the reducers start _iterating_ - they hardened my speculation.

>> 

>> 

>> 

>> So my question is, if it is “normal”, that grouping is so

>> cost-intensive that – in my case – the runtime increases by 4 times?

>> 

>> I have data from the same experiments running on a 13 nodes cluster

>> with 26 cores with Apache Hadoop MapReduce, where the gap is still

>> present, but smaller (50s vs 57s or 55s vs 65s).

>> 

>> 

>> 

>> Is there something I might could do to optimize the grouping? Some

>> codesnipplets:

>> 

>> 

>> 

>> The Job:

>> DataSet<?> output = input

>> 

>>                         .mapPartition(new

>> MR_GPMRS_Mapper()).withBroadcastSet(metaData,

>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.

>> getSimpleName()+"_MAPPER")

>> 

>>                         .groupBy(0)

>> 

>>                         .reduceGroup(new

>> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaDat

>> a,

>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.

>> getSimpleName()+"_REDUCER");

>> 

>> 

>> 

>> MR_GPMRS_Mapper():

>> 

>> public class MR_GPMRS_Mapper <T extends Tuple> extends

>> RichMapPartitionFunction<T,

>> Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,

>> BitSet, BitSet>>>

>> 

>> 

>> 

>> MR_GPMRS_Reducer():

>> 

>> public class MR_GPMRS_Reducer <T extends Tuple> extends

>> RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>

>> ,

>> BitSet, BitSet>>, T>

>> 

>> 

>> 

>> The Tuple2 has as Payload on position f1 the Tuple3 and on position

>> f0 the Integer Key for grouping.

>> 

>> 

>> 

>> Any suggestions (or comments, that it is a “normal” behaviour) are welcome :

>> - )

>> 

>> 

>> 

>> Thank you in advance!

>> 

>> Robert