Hi Mailing List, i actually do some benchmarks with different algorithms. The System has 8 nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop MapReduce,
the execution mode is set to “BATCH_FORCED” It is suspicious, that three of the six algorithms had a big gap in runtime (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally the algorithms in the “upper” group using a groupBy transformation and the algorithms
in the “lower” group don’t use groupBy. I attached the plot for better visualization. I also checked the logs, especially the time, when the mappers finishing and the reducers start _iterating_ - they hardened my speculation. So my question is, if it is “normal”, that grouping is so cost-intensive that – in my case – the runtime increases by 4 times? I have data from the same experiments running on a 13 nodes cluster with 26 cores with Apache Hadoop MapReduce, where the gap is still present, but smaller (50s vs 57s or 55s vs 65s). Is there something I might could do to optimize the grouping? Some codesnipplets: The Job: .mapPartition(new
MR_GPMRS_Mapper()).withBroadcastSet(metaData,
"MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER") .groupBy(0) .reduceGroup(new
MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData,
"MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER"); MR_GPMRS_Mapper(): public
class MR_GPMRS_Mapper <T
extends Tuple>
extends RichMapPartitionFunction<T, Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,
BitSet, BitSet>>> MR_GPMRS_Reducer(): public
class MR_GPMRS_Reducer <T
extends Tuple>
extends RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,
BitSet, BitSet>>, T> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the Integer Key for grouping. Any suggestions (or comments, that it is a “normal” behaviour) are welcome : - ) Thank you in advance! Robert indep.pdf (291K) Download Attachment |
Hello Robert,
> Is there something I might could do to optimize the grouping? You can try to make your `RichGroupReduceFunction` implement the `GroupCombineFunction` interface, so that Flink can do combining before the shuffle, which might significantly reduce the network load. (How much the combiner helps the performance can greatly depend on how large are your groups on average.) Alternatively, if you can reformulate your algorithm to use a `reduce` instead of a `reduceGroup` that might also improve the performance. Also, if you are using a `reduce`, then you can try calling `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine hint is a relatively new feature, so you need the current master for this.) Best, Gábor 2016-07-25 14:06 GMT+02:00 Paschek, Robert <[hidden email]>: > Hi Mailing List, > > > > i actually do some benchmarks with different algorithms. The System has 8 > nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if > somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop > MapReduce, the execution mode is set to “BATCH_FORCED” > > > > It is suspicious, that three of the six algorithms had a big gap in runtime > (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally the algorithms > in the “upper” group using a groupBy transformation and the algorithms in > the “lower” group don’t use groupBy. > > I attached the plot for better visualization. > > > > I also checked the logs, especially the time, when the mappers finishing and > the reducers start _iterating_ - they hardened my speculation. > > > > So my question is, if it is “normal”, that grouping is so cost-intensive > that – in my case – the runtime increases by 4 times? > > I have data from the same experiments running on a 13 nodes cluster with 26 > cores with Apache Hadoop MapReduce, where the gap is still present, but > smaller (50s vs 57s or 55s vs 65s). > > > > Is there something I might could do to optimize the grouping? Some > codesnipplets: > > > > The Job: > DataSet<?> output = input > > .mapPartition(new > MR_GPMRS_Mapper()).withBroadcastSet(metaData, > "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER") > > .groupBy(0) > > .reduceGroup(new > MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData, > "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER"); > > > > MR_GPMRS_Mapper(): > > public class MR_GPMRS_Mapper <T extends Tuple> extends > RichMapPartitionFunction<T, Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>, > BitSet, BitSet>>> > > > > MR_GPMRS_Reducer(): > > public class MR_GPMRS_Reducer <T extends Tuple> extends > RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>, > BitSet, BitSet>>, T> > > > > The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the > Integer Key for grouping. > > > > Any suggestions (or comments, that it is a “normal” behaviour) are welcome : > - ) > > > > Thank you in advance! > > Robert |
+1 to what Gavor said. The hash combine will be part of the upcoming
1.1. release, too. This could be further amplified by the blocking intermediate results, which have a very simplistic implementation writing out many different files, which can lead to a lot of random I/O. – Ufuk On Tue, Jul 26, 2016 at 11:41 AM, Gábor Gévay <[hidden email]> wrote: > Hello Robert, > >> Is there something I might could do to optimize the grouping? > > You can try to make your `RichGroupReduceFunction` implement the > `GroupCombineFunction` interface, so that Flink can do combining > before the shuffle, which might significantly reduce the network load. > (How much the combiner helps the performance can greatly depend on how > large are your groups on average.) > > Alternatively, if you can reformulate your algorithm to use a `reduce` > instead of a `reduceGroup` that might also improve the performance. > Also, if you are using a `reduce`, then you can try calling > `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine > hint is a relatively new feature, so you need the current master for > this.) > > Best, > Gábor > > > > 2016-07-25 14:06 GMT+02:00 Paschek, Robert <[hidden email]>: >> Hi Mailing List, >> >> >> >> i actually do some benchmarks with different algorithms. The System has 8 >> nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if >> somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop >> MapReduce, the execution mode is set to “BATCH_FORCED” >> >> >> >> It is suspicious, that three of the six algorithms had a big gap in runtime >> (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally the algorithms >> in the “upper” group using a groupBy transformation and the algorithms in >> the “lower” group don’t use groupBy. >> >> I attached the plot for better visualization. >> >> >> >> I also checked the logs, especially the time, when the mappers finishing and >> the reducers start _iterating_ - they hardened my speculation. >> >> >> >> So my question is, if it is “normal”, that grouping is so cost-intensive >> that – in my case – the runtime increases by 4 times? >> >> I have data from the same experiments running on a 13 nodes cluster with 26 >> cores with Apache Hadoop MapReduce, where the gap is still present, but >> smaller (50s vs 57s or 55s vs 65s). >> >> >> >> Is there something I might could do to optimize the grouping? Some >> codesnipplets: >> >> >> >> The Job: >> DataSet<?> output = input >> >> .mapPartition(new >> MR_GPMRS_Mapper()).withBroadcastSet(metaData, >> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER") >> >> .groupBy(0) >> >> .reduceGroup(new >> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData, >> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER"); >> >> >> >> MR_GPMRS_Mapper(): >> >> public class MR_GPMRS_Mapper <T extends Tuple> extends >> RichMapPartitionFunction<T, Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>, >> BitSet, BitSet>>> >> >> >> >> MR_GPMRS_Reducer(): >> >> public class MR_GPMRS_Reducer <T extends Tuple> extends >> RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>, >> BitSet, BitSet>>, T> >> >> >> >> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the >> Integer Key for grouping. >> >> >> >> Any suggestions (or comments, that it is a “normal” behaviour) are welcome : >> - ) >> >> >> >> Thank you in advance! >> >> Robert |
In reply to this post by Paschek, Robert
Hi Robert, Are you able to simplify the your function input / output types? Flink aggressively serializes the data stream and complex types such as ArrayList and BitSet will be much slower to process. Are you able to reconstruct the lists to be groupings on elements?On Mon, Jul 25, 2016 at 8:06 AM, Paschek, Robert <[hidden email]> wrote:
|
Hi Gábor, hi Ufuk, hi Greg, thank you for your very helpful responses! > You can try to make your `RichGroupReduceFunction` implement the
> `GroupCombineFunction` interface, so that Flink can do combining
> before the shuffle, which might significantly reduce the network load. > (How much the combiner helps the performance can greatly depend on how
> large are your groups on average.)
After I implemented the combiner mentioned here the difference between the runtime of these algorithms with and without a combiner decreased. Thank you for the hint! On curios thing: My Reducer receives BroadcastVariables and access them in the open() Method. When the combine() Method is called, the BroadCast Variable seems not set yet: I got an explicit error messages within the open() method. Is this
a potential bug? Using Apache Flink 1.0.3. To avoid changing my reducers, I’am wondering, if I should implement a GroupCombineFunction independent from the reducer instead: As far as I understand, this will work similar?
-
The following transformation - especially the groupBy() - will run first on each local Machine:
combinedWords = input .groupBy(0) .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>()) -
And then, the following transformation will shuffle the data within the 2nd groupBy() over the network: output
> Alternatively, if you can reformulate your algorithm to use a `reduce`
> instead of a `reduceGroup` that might also improve the performance. > Also, if you are using a `reduce`, then you can try calling
> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine
> hint is a relatively new feature, so you need the current master for > this.) I have to iterate through each tuple multiple time and the final result can only be emitted after the last tuple is processed, so I think, I can’t
use a reduce. > This could be further amplified by the blocking intermediate results, which have a very simplistic implementation writing out many different files, which can lead to a lot of random I/O. Thank you for theses technical explanation. I will mentioned it in my evaluation! > Are you able to simplify the your function input / output types? Flink aggressively serializes the
data stream and complex types such as ArrayList and BitSet will be much slower to process. Are you able to reconstruct the lists to be groupings on elements? Thanks again! Robert Von: Greg Hogan [[hidden email]]
Hi Robert, Are you able to simplify the your function input / output types? Flink aggressively serializes the data stream and complex types such as ArrayList and BitSet will be much slower to process.
Are you able to reconstruct the lists to be groupings on elements? Greg -----Ursprüngliche Nachricht----- +1 to what Gavor said. The hash combine will be part of the upcoming 1.1. release, too. This could be further amplified by the blocking intermediate results, which have a very simplistic implementation writing out many different files, which can lead to a lot of random I/O. – Ufuk On Tue, Jul 26, 2016 at 11:41 AM, Gábor Gévay <[hidden email]> wrote: > Hello Robert, > >> Is there something I might could do to optimize the grouping? > > You can try to make your `RichGroupReduceFunction` implement the
> `GroupCombineFunction` interface, so that Flink can do combining
> before the shuffle, which might significantly reduce the network load. > (How much the combiner helps the performance can greatly depend on how
> large are your groups on average.) > > Alternatively, if you can reformulate your algorithm to use a `reduce`
> instead of a `reduceGroup` that might also improve the performance. > Also, if you are using a `reduce`, then you can try calling
> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine
> hint is a relatively new feature, so you need the current master for > this.) > > Best, > Gábor > > > > 2016-07-25 14:06 GMT+02:00 Paschek, Robert <[hidden email]>: >> Hi Mailing List, >> >> >> >> i actually do some benchmarks with different algorithms. The System
>> has 8 nodes and a configured parallelism of 48 - The IBM-Power-1
>> cluster, if somebody from the TU Berlin read this : - ) – and to
>> “simulate” Hadoop MapReduce, the execution mode is set to “BATCH_FORCED” >> >> >> >> It is suspicious, that three of the six algorithms had a big gap in
>> runtime (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally
>> the algorithms in the “upper” group using a groupBy transformation
>> and the algorithms in the “lower” group don’t use groupBy. >> >> I attached the plot for better visualization. >> >> >> >> I also checked the logs, especially the time, when the mappers
>> finishing and the reducers start _iterating_ - they hardened my speculation. >> >> >> >> So my question is, if it is “normal”, that grouping is so
>> cost-intensive that – in my case – the runtime increases by 4 times? >> >> I have data from the same experiments running on a 13 nodes cluster
>> with 26 cores with Apache Hadoop MapReduce, where the gap is still
>> present, but smaller (50s vs 57s or 55s vs 65s). >> >> >> >> Is there something I might could do to optimize the grouping? Some >> codesnipplets: >> >> >> >> The Job: >> DataSet<?> output = input >> >> .mapPartition(new
>> MR_GPMRS_Mapper()).withBroadcastSet(metaData, >> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class. >> getSimpleName()+"_MAPPER") >> >> .groupBy(0) >> >> .reduceGroup(new
>> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaDat >> a, >> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class. >> getSimpleName()+"_REDUCER"); >> >> >> >> MR_GPMRS_Mapper(): >> >> public class MR_GPMRS_Mapper <T extends Tuple> extends
>> RichMapPartitionFunction<T, >> Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>, >> BitSet, BitSet>>> >> >> >> >> MR_GPMRS_Reducer(): >> >> public class MR_GPMRS_Reducer <T extends Tuple> extends
>> RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>> >> , >> BitSet, BitSet>>, T> >> >> >> >> The Tuple2 has as Payload on position f1 the Tuple3 and on position
>> f0 the Integer Key for grouping. >> >> >> >> Any suggestions (or comments, that it is a “normal” behaviour) are welcome : >> - ) >> >> >> >> Thank you in advance! >> >> Robert |
Free forum by Nabble | Edit this page |