Hi Urs,
a hash-partition operator should not spill. In general, DataSet plans aim to be as much pipelined as possible.
There are a few cases when spilling happens:
- full sort with not sufficient memory
- hash-tables that need to spill (only in join operators)
- range partitioning to compute a histogram of the partitioning keys.
- temp nodes to avoid deadlocks. These can occur in plans that branch and join later like the following:
/--- Map ---\
Input --< JOIN --- Output
\--- Map ---/
A simple plan without branching with as the one you posted
readCsvFile -> partitionBy(0) -> groupBy(0) -> sortGroup(0) -> first(n)
has no reason to spill except for the full sort that is required for the final aggregation.
Can you share the execution plan that you get of the plan (ExecutionEnvironment.getExecutionPlan())?
Btw, the sortGroup(0) call is superfluous because it would sort a group where all 0-fields are the same on the 0-field.
I believe Flink's optimizer automatically removes that so it does not impact the performance.
Sorting on another field would indeed make sense, because this would determine order within a group and hence the records which are forwarded by First(n).
In order to force a combiner on a partitioned data set, you can do the following:
--------
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> data = randData(env);
DataSet<Tuple2<Long, Long>> result = data.partitionByHash(0)
.groupBy(0).combineGroup(new First3())
.withForwardedFields("f0")
.groupBy(0).reduceGroup(new First3());
result.print();
}
public static class First3 implements
GroupCombineFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>,
GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public void combine(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
reduce(values, out);
}
@Override
public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
int i = 0;
for (Tuple2<Long, Long> v : values) {
out.collect(v);
i++;
if (i == 3) {
break;
}
}
}
}