combineGroup get false results

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

combineGroup get false results

anissa moussaoui
Hi,

I used the combineGroup function to reduce groups of a very large dataset. By modifying the parallelism to 1 I have a different results with a parallelism to 8, Knowing that the good results are those obtained with the parallelism with 1. 

I also used table api to group dataset and select sum of column, i also had the same result with parallelism to 8 and combineGroup.

My combineGroup function :

public class SumGroup extends Resampling implements GroupReduceFunction<Row, Row>{

      @Override
       public void reduce(Iterable<Row> values, Collector<Row> out) throws Exception {
             Iterator<Row> itr = values.iterator();
             double sum = 0;
             Row row = null;
             while(itr.hasNext()) {
                         row = itr.next();
                         sum = sum + (double) row.getField(indexField);
             }
             row.setField(indexField, sum);
            out.collect(row);
      }
   }
I call this function like this : 
DataSet<Row> ds = env.toDataSet(inputTable(env), Row.class)
                      .groupBy(groupFields.stream().toArray(String[] :: new))
                             .sortGroup(sortField, Order.ASCENDING)
                                    .reduceGroup(new SumGroup()).returns(getOutputType());

results with parallelism to 1 :
24000 cluster 1 28/02/2017 06:00:00
21000 cluster 1 31/03/2017 06:00:00
15000 cluster 1 30/04/2017 06:00:00
10000 cluster 1 31/05/2017 06:00:00

results with parallelism to 8 :
22000 cluster 1 28/02/2017 06:00:00
4350 cluster 1 31/03/2017 06:00:00
14000 cluster 1 30/04/2017 06:00:00
2256 cluster 1 31/05/2017 06:00:00
I do not know how to make no difference with the modification of parallelism and have same results with parallelism to 1.

can someone have any idea to help me please ?

thank you in advance !!

Anissa

Reply | Threaded
Open this post in threaded view
|

Re: combineGroup get false results

Fabian Hueske-2
Hi Anissa,

Are you using combineGroup or reduceGroup?
Your question refers to combineGroup, but the code only shows reduceGroup.

combineGroup is non-deterministic by design to enable efficient partial results without network and disk IO.
reduceGroup is deterministic given a deterministic key extractor and deterministic GroupReduceFunction.

Hope this helps,
Fabian

Am Di., 20. Aug. 2019 um 14:21 Uhr schrieb anissa moussaoui <[hidden email]>:
Hi,

I used the combineGroup function to reduce groups of a very large dataset. By modifying the parallelism to 1 I have a different results with a parallelism to 8, Knowing that the good results are those obtained with the parallelism with 1. 

I also used table api to group dataset and select sum of column, i also had the same result with parallelism to 8 and combineGroup.

My combineGroup function :

public class SumGroup extends Resampling implements GroupReduceFunction<Row, Row>{

      @Override
       public void reduce(Iterable<Row> values, Collector<Row> out) throws Exception {
             Iterator<Row> itr = values.iterator();
             double sum = 0;
             Row row = null;
             while(itr.hasNext()) {
                         row = itr.next();
                         sum = sum + (double) row.getField(indexField);
             }
             row.setField(indexField, sum);
            out.collect(row);
      }
   }
I call this function like this : 
DataSet<Row> ds = env.toDataSet(inputTable(env), Row.class)
                      .groupBy(groupFields.stream().toArray(String[] :: new))
                             .sortGroup(sortField, Order.ASCENDING)
                                    .reduceGroup(new SumGroup()).returns(getOutputType());

results with parallelism to 1 :
24000 cluster 1 28/02/2017 06:00:00
21000 cluster 1 31/03/2017 06:00:00
15000 cluster 1 30/04/2017 06:00:00
10000 cluster 1 31/05/2017 06:00:00

results with parallelism to 8 :
22000 cluster 1 28/02/2017 06:00:00
4350 cluster 1 31/03/2017 06:00:00
14000 cluster 1 30/04/2017 06:00:00
2256 cluster 1 31/05/2017 06:00:00
I do not know how to make no difference with the modification of parallelism and have same results with parallelism to 1.

can someone have any idea to help me please ?

thank you in advance !!

Anissa

Reply | Threaded
Open this post in threaded view
|

Re: combineGroup get false results

anissa moussaoui
Thanks for your feedback!

Sorry, effectively I used reductionGroup, but that gives different results when I change the parallelism to 8 (more than 1) and the true results are with Parallelism 1 and I want to set it to 8.

I do not know how do to have the same result by modifying the parallelism using reduceGroup.

Thank you in advance !

Anissa

Le jeu. 22 août 2019 à 10:29, Fabian Hueske <[hidden email]> a écrit :
Hi Anissa,

Are you using combineGroup or reduceGroup?
Your question refers to combineGroup, but the code only shows reduceGroup.

combineGroup is non-deterministic by design to enable efficient partial results without network and disk IO.
reduceGroup is deterministic given a deterministic key extractor and deterministic GroupReduceFunction.

Hope this helps,
Fabian

Am Di., 20. Aug. 2019 um 14:21 Uhr schrieb anissa moussaoui <[hidden email]>:
Hi,

I used the combineGroup function to reduce groups of a very large dataset. By modifying the parallelism to 1 I have a different results with a parallelism to 8, Knowing that the good results are those obtained with the parallelism with 1. 

I also used table api to group dataset and select sum of column, i also had the same result with parallelism to 8 and combineGroup.

My combineGroup function :

public class SumGroup extends Resampling implements GroupReduceFunction<Row, Row>{

      @Override
       public void reduce(Iterable<Row> values, Collector<Row> out) throws Exception {
             Iterator<Row> itr = values.iterator();
             double sum = 0;
             Row row = null;
             while(itr.hasNext()) {
                         row = itr.next();
                         sum = sum + (double) row.getField(indexField);
             }
             row.setField(indexField, sum);
            out.collect(row);
      }
   }
I call this function like this : 
DataSet<Row> ds = env.toDataSet(inputTable(env), Row.class)
                      .groupBy(groupFields.stream().toArray(String[] :: new))
                             .sortGroup(sortField, Order.ASCENDING)
                                    .reduceGroup(new SumGroup()).returns(getOutputType());

results with parallelism to 1 :
24000 cluster 1 28/02/2017 06:00:00
21000 cluster 1 31/03/2017 06:00:00
15000 cluster 1 30/04/2017 06:00:00
10000 cluster 1 31/05/2017 06:00:00

results with parallelism to 8 :
22000 cluster 1 28/02/2017 06:00:00
4350 cluster 1 31/03/2017 06:00:00
14000 cluster 1 30/04/2017 06:00:00
2256 cluster 1 31/05/2017 06:00:00
I do not know how to make no difference with the modification of parallelism and have same results with parallelism to 1.

can someone have any idea to help me please ?

thank you in advance !!

Anissa

Reply | Threaded
Open this post in threaded view
|

Re: combineGroup get false results

Fabian Hueske-2
Hi Anissa,

This looks strange. If I understand your code correctly, your GroupReduce function is summing up a field.
Looking at the results that you posted, it seems as if there is some data missing (the total sum does not seem to match).

For groupReduce it is important that the grouping keys are deterministic. Since you provide a String array as key definition, there is no KeyExtractor function.
However, something that can cause random results are key attributes with random hash values.
What is the type of your key fields?

Another thing you might want to check is if the input (inputTable) to the groupReduce function is the same with both parallelism settings.

Best, Fabian
Reply | Threaded
Open this post in threaded view
|

Re: combineGroup get false results

anissa moussaoui
Hi Fabian,

My GroupReduce function sum one column of input rows of each group.

My key fields is array of multiple type, in this case is string and long. The result that i'm posting is just represents sampling of output dataset. 

Thank you in advance !

Anissa

Le jeu. 22 août 2019 à 11:24, Fabian Hueske <[hidden email]> a écrit :
Hi Anissa,

This looks strange. If I understand your code correctly, your GroupReduce function is summing up a field.
Looking at the results that you posted, it seems as if there is some data missing (the total sum does not seem to match).

For groupReduce it is important that the grouping keys are deterministic. Since you provide a String array as key definition, there is no KeyExtractor function.
However, something that can cause random results are key attributes with random hash values.
What is the type of your key fields?

Another thing you might want to check is if the input (inputTable) to the groupReduce function is the same with both parallelism settings.

Best, Fabian
Reply | Threaded
Open this post in threaded view
|

Re: combineGroup get false results

Fabian Hueske-2
Hi,

If all key fields are primitive types (long) or String, their hash values should be deterministic.

There are two things that can go wrong:
1) Records are assigned to the wrong group.
2) The computation of a group is buggy.

I'd first check that 1) is correct.
Can you replace the sum function with a simple count and check if the counts for each group are the same for p=1 and p=8?



Am Do., 22. Aug. 2019 um 11:45 Uhr schrieb anissa moussaoui <[hidden email]>:
Hi Fabian,

My GroupReduce function sum one column of input rows of each group.

My key fields is array of multiple type, in this case is string and long. The result that i'm posting is just represents sampling of output dataset. 

Thank you in advance !

Anissa

Le jeu. 22 août 2019 à 11:24, Fabian Hueske <[hidden email]> a écrit :
Hi Anissa,

This looks strange. If I understand your code correctly, your GroupReduce function is summing up a field.
Looking at the results that you posted, it seems as if there is some data missing (the total sum does not seem to match).

For groupReduce it is important that the grouping keys are deterministic. Since you provide a String array as key definition, there is no KeyExtractor function.
However, something that can cause random results are key attributes with random hash values.
What is the type of your key fields?

Another thing you might want to check is if the input (inputTable) to the groupReduce function is the same with both parallelism settings.

Best, Fabian