There doesn't seem to be a built-in way
to apply multiple aggregations to a window.
You could use an aggregate function
that combines other aggregate functions, but admittedly this will
get unwieldy as the number of functions increase:
public static class MultiAggregateFunction<IN, ACC1, OUT1, F1 extends AggregateFunction<IN, ACC1, OUT1>, ACC2, OUT2, F2 extends AggregateFunction<IN, ACC2, OUT2>>
implements AggregateFunction<IN, Tuple2<ACC1, ACC2>, Tuple2<OUT1, OUT2>> {
private final F1 f1;
private final F2 f2;
public MultiAggregateFunction(F1 f1, F2 f2) {
this.f1 = f1;
this.f2 = f2;
}
@Override
public Tuple2<ACC1, ACC2> createAccumulator() {
return Tuple2.of(f1.createAccumulator(), f2.createAccumulator());
}
@Override
public Tuple2<ACC1, ACC2> add(IN value, Tuple2<ACC1, ACC2> accumulator) {
f1.add(value, accumulator.f0);
f2.add(value, accumulator.f1);
return accumulator;
}
@Override
public Tuple2<OUT1, OUT2> getResult(Tuple2<ACC1, ACC2> accumulator) {
return Tuple2.of(f1.getResult(accumulator.f0), f2.getResult(accumulator.f1));
}
@Override
public Tuple2<ACC1, ACC2> merge(Tuple2<ACC1, ACC2> a, Tuple2<ACC1, ACC2> b) {
return Tuple2.of(f1.merge(a.f0, b.f0), f2.merge(a.f1, b.f1));
}
}
On 08/10/2019 12:09, Frank Wilson
wrote:
Hi,
In the datastream api is there a way to take two
aggregate functions and apply them to the same window? The
output would be a stream of 2-tuples containing the result of
each aggregate function.
I feel it should be possible to combine previously
written functions rather than writing a bespoke ‘god’ aggregate
function for each pipeline.
Thanks,
Frank