In this plan, the only spilling should happen in the sort for the final aggregation.- invoke group reducer for each group- fully sort the data- locally forward the data (because of the forwarded field information [1])- invoke the combiner for each in-memory sorted group- partially sort the data in memory on the first field (not going to disk)- hash partition the input dataCan you share the execution plan that you get of the plan (ExecutionEnvironment.\--- Map ---/There are a few cases when spilling happens:Hi Urs,a hash-partition operator should not spill. In general, DataSet plans aim to be as much pipelined as possible.- full sort with not sufficient memory- hash-tables that need to spill (only in join operators)- range partitioning to compute a histogram of the partitioning keys.- temp nodes to avoid deadlocks. These can occur in plans that branch and join later like the following:/--- Map ---\Input --< JOIN --- OutputA simple plan without branching with as the one you posted
readCsvFile -> partitionBy(0) -> groupBy(0) -> sortGroup(0) -> first(n)
has no reason to spill except for the full sort that is required for the final aggregation.getExecutionPlan())? Btw, the sortGroup(0) call is superfluous because it would sort a group where all 0-fields are the same on the 0-field.I believe Flink's optimizer automatically removes that so it does not impact the performance.Sorting on another field would indeed make sense, because this would determine order within a group and hence the records which are forwarded by First(n).In order to force a combiner on a partitioned data set, you can do the following:--------public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> data = randData(env);
DataSet<Tuple2<Long, Long>> result = data.partitionByHash(0)
.groupBy(0).combineGroup(new First3())
.withForwardedFields("f0")
.groupBy(0).reduceGroup(new First3());
result.print();
}
public static class First3 implements
GroupCombineFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>,
GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public void combine(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
reduce(values, out);
}
@Override
public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
int i = 0;
for (Tuple2<Long, Long> v : values) {
out.collect(v);
i++;
if (i == 3) {
break;
}
}
}
}--------The generated plan willBest, Fabian
[1] https://ci.apache.org/projects/flink/flink-docs- release-1.3/dev/batch/index. html#semantic-annotations
2017-09-05 22:21 GMT+02:00 Newport, Billy <[hidden email]>:We have the same issue. We are finding that we cannot express the data flow in a natural way because of unnecessary spilling. Instead, we're making our own operators which combine multiple steps together and essentially hide it from flink OR sometimes we even have to read an input dataset once per flow to avoid spilling. The performance improvements are dramatic but it's kind of reducing flink to a thread scheduler rather than a data flow engine because we basically cannot express the flow to flink. This worries us because if we let others write flink code using our infra, we'll be spending all our time collapsing their flows into much simpler but less intuititve flows to prevent flink from spilling.
This also means higher level APIs such as the table API or Beam are off the table because they prevent us optimizing in this manner.
We already have prior implementations of the logic we are implementing in flink and as a result, we know it's much less efficient than the prior implementations which is giving us pause for rolling it out more broadly, we're afraid of the flink tax in effect from a performance point of view as well as from a usability point of view given naïve flows are not performant without significant collapsing.
For example, we see spilling here:
Dataset -> Map > Filter -> Map -> Output
We're trying to combine the Map ->Output into the filter operation now to write the records which are not passed through to an output file during the Filter.
Or in this case
Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output
Rewriting as
Dataset -> Map -> FilterT -> CoGroup > Map -> Output
Dataset -> Map -> FilterF -> Map -> Output
That is two separate flows is multiples faster. That is, reading the file twice rather than once.
This is all pretty unintuitive and makes using Flink pretty difficult for us never mind our users. Writing the flink dataflows in a naïve way is fast but getting it to run with acceptable efficiency results in obscure workarounds and collapsing and takes the bulk of the time for us which is a shame and the main reason, we don't want to push it out for general use yet.
It seems like it badly needs a flow rewriter which is capable of rewriting a naïve flow to use operators or restructured flows automatically. We're doing it by hand right now but there has to be a better way.
It's a shame really, it's so close.
Billy
-----Original Message-----
From: Urs Schoenenberger [mailto:[hidden email]]
Sent: Tuesday, September 05, 2017 6:30 AM
To: user
Subject: DataSet: partitionByHash without materializing/spilling the entire partition?
Hi all,
we have a DataSet pipeline which reads CSV input data and then
essentially does a combinable GroupReduce via first(n).
In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) ->
first(n)), we got a jobgraph like this:
source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce
This works, but we found the combine phase to be inefficient because not
enough combinable elements fit into a sorter. My idea was to
pre-partition the DataSet to increase the chance of combinable elements
(readCsvFile -> partitionBy(0) -> groupBy(0) -> sortGroup(0) -> first(n)).
To my surprise, I found that this changed the job graph to
source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine
--[Hash Partition on 0, Sort]--> reduce
while materializing and spilling the entire partitions at the
partition(noop)-Operator!
Is there any way I can partition the data on the way from source to
combine without spilling? That is, can I get a job graph that looks like
source --[Hash Partition on 0]--> combine --[Hash Partition on 0,
Sort]--> reduce
instead?
Thanks,
Urs
--
Urs Schönenberger - [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Free forum by Nabble | Edit this page |