Computing two aggregate functions on the same window

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Computing two aggregate functions on the same window

Frank Wilson
Hi,

In the datastream api is there a way to take two aggregate functions and apply them to the same window? The output would be a stream of 2-tuples containing the result of each aggregate function. 

I feel it should be possible to combine previously written functions rather than writing a bespoke ‘god’ aggregate function for each pipeline.

Thanks, 

Frank
Reply | Threaded
Open this post in threaded view
|

Re: Computing two aggregate functions on the same window

Chesnay Schepler
There doesn't seem to be a built-in way to apply multiple aggregations to a window.

You could use an aggregate function that combines other aggregate functions, but admittedly this will get unwieldy as the number of functions increase:
public static class MultiAggregateFunction<IN, ACC1, OUT1, F1 extends AggregateFunction<IN, ACC1, OUT1>, ACC2, OUT2, F2 extends AggregateFunction<IN, ACC2, OUT2>>
   implements AggregateFunction<IN, Tuple2<ACC1, ACC2>, Tuple2<OUT1, OUT2>> {

   private final F1 f1;
   private final F2 f2;

   public MultiAggregateFunction(F1 f1, F2 f2) {
      this.f1 = f1;
      this.f2 = f2;
   }

   @Override
   public Tuple2<ACC1, ACC2> createAccumulator() {
      return Tuple2.of(f1.createAccumulator(), f2.createAccumulator());
   }

   @Override
   public Tuple2<ACC1, ACC2> add(IN value, Tuple2<ACC1, ACC2> accumulator) {
      f1.add(value, accumulator.f0);
      f2.add(value, accumulator.f1);
      return accumulator;
   }

   @Override
   public Tuple2<OUT1, OUT2> getResult(Tuple2<ACC1, ACC2> accumulator) {
      return Tuple2.of(f1.getResult(accumulator.f0), f2.getResult(accumulator.f1));
   }

   @Override
   public Tuple2<ACC1, ACC2> merge(Tuple2<ACC1, ACC2> a, Tuple2<ACC1, ACC2> b) {
      return Tuple2.of(f1.merge(a.f0, b.f0), f2.merge(a.f1, b.f1));
   }
}

On 08/10/2019 12:09, Frank Wilson wrote:
Hi,

In the datastream api is there a way to take two aggregate functions and apply them to the same window? The output would be a stream of 2-tuples containing the result of each aggregate function. 

I feel it should be possible to combine previously written functions rather than writing a bespoke ‘god’ aggregate function for each pipeline.

Thanks, 

Frank