Hi,
I used the combineGroup function to reduce groups of a very large dataset. By modifying the parallelism to 1 I have a different results with a parallelism to 8, Knowing that the good results are those obtained with the parallelism with 1. I also used table api to group dataset and select sum of column, i also had the same result with parallelism to 8 and combineGroup. My combineGroup function : public class SumGroup extends Resampling implements GroupReduceFunction<Row, Row>{ @Override public void reduce(Iterable<Row> values, Collector<Row> out) throws Exception { Iterator<Row> itr = values.iterator(); double sum = 0; Row row = null; while(itr.hasNext()) { row = itr.next(); sum = sum + (double) row.getField(indexField); } row.setField(indexField, sum); out.collect(row); } } I call this function like this : DataSet<Row> ds = env.toDataSet(inputTable(env), Row.class) .groupBy(groupFields.stream().toArray(String[] :: new)) .sortGroup(sortField, Order.ASCENDING) .reduceGroup(new SumGroup()).returns(getOutputType()); results with parallelism to 1 :
results with parallelism to 8 :
I do not know how to make no difference with the modification of parallelism and have same results with parallelism to 1. can someone have any idea to help me please ? thank you in advance !! Anissa |
Hi Anissa, Are you using combineGroup or reduceGroup? Your question refers to combineGroup, but the code only shows reduceGroup. combineGroup is non-deterministic by design to enable efficient partial results without network and disk IO. reduceGroup is deterministic given a deterministic key extractor and deterministic GroupReduceFunction. Hope this helps, Fabian Am Di., 20. Aug. 2019 um 14:21 Uhr schrieb anissa moussaoui <[hidden email]>:
|
Thanks for your feedback! Sorry, effectively I used reductionGroup, but that gives different results when I change the parallelism to 8 (more than 1) and the true results are with Parallelism 1 and I want to set it to 8. I do not know how do to have the same result by modifying the parallelism using reduceGroup. Thank you in advance ! Anissa Le jeu. 22 août 2019 à 10:29, Fabian Hueske <[hidden email]> a écrit :
|
Hi Anissa, This looks strange. If I understand your code correctly, your GroupReduce function is summing up a field. Looking at the results that you posted, it seems as if there is some data missing (the total sum does not seem to match). For groupReduce it is important that the grouping keys are deterministic. Since you provide a String array as key definition, there is no KeyExtractor function. However, something that can cause random results are key attributes with random hash values. What is the type of your key fields? Another thing you might want to check is if the input (inputTable) to the groupReduce function is the same with both parallelism settings. Best, Fabian |
Hi Fabian, My GroupReduce function sum one column of input rows of each group. My key fields is array of multiple type, in this case is string and long. The result that i'm posting is just represents sampling of output dataset. Thank you in advance ! Anissa Le jeu. 22 août 2019 à 11:24, Fabian Hueske <[hidden email]> a écrit :
|
Hi, If all key fields are primitive types (long) or String, their hash values should be deterministic. There are two things that can go wrong: 1) Records are assigned to the wrong group. 2) The computation of a group is buggy. I'd first check that 1) is correct. Can you replace the sum function with a simple count and check if the counts for each group are the same for p=1 and p=8? Am Do., 22. Aug. 2019 um 11:45 Uhr schrieb anissa moussaoui <[hidden email]>:
|
Free forum by Nabble | Edit this page |