Extending stream events with a an aggregate value

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Extending stream events with a an aggregate value

Nicholas Walton
I’m sure I’m being a complete idiot, since this seems so trivial but if someone could point me in the right direction I’d be very grateful.

I have a simple data stream [(Int, Double)] keyed on the Int. I can calculate the running max of the stream no problem using “.max(2)”. But I want to output the original input together with the running max value as [(Int, Double, Double)]. I’ve hunted high and low for a means to do something so trivial.

Nick Walton
Reply | Threaded
Open this post in threaded view
|

Re: Extending stream events with a an aggregate value

Piotr Nowojski
Hi,

Ńo worries :) You probably need to write your own process function to do exactly that, maybe something like this:

DataStream<Tuple2<Long, Double>> test;

DataStream<Tuple3<Long, Double, Double>> max = test.keyBy(0)
      .process(new KeyedProcessFunction<Tuple, Tuple2<Long, Double>, Tuple3<Long, Double, Double>>() {
         public ValueState<Double> max;

         @Override
         public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Double> descriptor =
                  new ValueStateDescriptor<>("max", TypeInformation.of(new TypeHint<Double>() {
                  }));
            sum = getRuntimeContext().getState(descriptor);
         }

         @Override
         public void processElement(Tuple2<Long, Double> value, Context ctx, Collector<Tuple3<Long, Double, Double>> out) throws Exception {
            // ...
         }
      });

You need to store max on the state if you care about recovering from failures/restarts without loosing previous max value. Please check the online documentation for ProcessFunction and handling state in Flink :)

Piotrek

> On 6 Jun 2018, at 15:55, Nicholas Walton <[hidden email]> wrote:
>
> I’m sure I’m being a complete idiot, since this seems so trivial but if someone could point me in the right direction I’d be very grateful.
>
> I have a simple data stream [(Int, Double)] keyed on the Int. I can calculate the running max of the stream no problem using “.max(2)”. But I want to output the original input together with the running max value as [(Int, Double, Double)]. I’ve hunted high and low for a means to do something so trivial.
>
> Nick Walton