Stashing key with AggregateFunction

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Stashing key with AggregateFunction

Ken Krugler
Hi list,

I was trying different ways to implement a moving average (count based, not time based).

The blunt instrument approach is to create a custom FlatMapFunction that keeps track of the last N values.

It seemed like using an AggregateFunction would be most consistent with the Flink API, along the lines of...

            .keyBy(new MyKeySelector())
            .window(GlobalWindows.create())
            .trigger(CountTrigger.of(1))
            .aggregate(new MovingAverageAggregator(10))

This works, but the API for the AggregateFunction (MovingAverageAggregator) feels a bit odd.

Specifically, I want to emit a <key, moving average> result from getResult(), but the key isn’t passed to the createAccumulator() method, nor is it passed to the getResult() method. So in the add() method I check if the accumulator I’ve created has a key set, and if not then I extract the key from the record and set it on the accumulator, so I can use it in the getResult() call.

Is this expected, or am I miss-using the functionality?

Thanks,

— Ken

--------------------------
Ken Krugler
custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Stashing key with AggregateFunction

Stefan Richter
Hi,

I have two possible options to achieve this. The first option is that you could obviously always derive the key again from the input of the aggregate function. The second option is combining an AggregateFunction with a ProcessWindowFunction. With the second solution you get incremental aggregation and the ProcessWindowFunction is only called once in the end with the result.

Best,
Stefan

Am 03.05.2018 um 19:53 schrieb Ken Krugler <[hidden email]>:

Hi list,

I was trying different ways to implement a moving average (count based, not time based).

The blunt instrument approach is to create a custom FlatMapFunction that keeps track of the last N values.

It seemed like using an AggregateFunction would be most consistent with the Flink API, along the lines of...

            .keyBy(new MyKeySelector())
            .window(GlobalWindows.create())
            .trigger(CountTrigger.of(1))
            .aggregate(new MovingAverageAggregator(10))

This works, but the API for the AggregateFunction (MovingAverageAggregator) feels a bit odd.

Specifically, I want to emit a <key, moving average> result from getResult(), but the key isn’t passed to the createAccumulator() method, nor is it passed to the getResult() method. So in the add() method I check if the accumulator I’ve created has a key set, and if not then I extract the key from the record and set it on the accumulator, so I can use it in the getResult() call.

Is this expected, or am I miss-using the functionality?

Thanks,

— Ken

--------------------------
Ken Krugler
custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Reply | Threaded
Open this post in threaded view
|

Re: Stashing key with AggregateFunction

Fabian Hueske-2
In reply to this post by Ken Krugler
Hi Ken,

You can also use an additional ProcessWindowFunction [1] that is applied on the result of the AggregateFunction to set the key.
Since the PWF is only applied on the final result, there no overhead (actually, it might even be slightly cheaper because the AggregateFunction can be simpler).

If you don't want to use a PWF, your approach is the right one.

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation

2018-05-03 19:53 GMT+02:00 Ken Krugler <[hidden email]>:
Hi list,

I was trying different ways to implement a moving average (count based, not time based).

The blunt instrument approach is to create a custom FlatMapFunction that keeps track of the last N values.

It seemed like using an AggregateFunction would be most consistent with the Flink API, along the lines of...

            .keyBy(new MyKeySelector())
            .window(GlobalWindows.create())
            .trigger(CountTrigger.of(1))
            .aggregate(new MovingAverageAggregator(10))

This works, but the API for the AggregateFunction (MovingAverageAggregator) feels a bit odd.

Specifically, I want to emit a <key, moving average> result from getResult(), but the key isn’t passed to the createAccumulator() method, nor is it passed to the getResult() method. So in the add() method I check if the accumulator I’ve created has a key set, and if not then I extract the key from the record and set it on the accumulator, so I can use it in the getResult() call.

Is this expected, or am I miss-using the functionality?

Thanks,

— Ken

--------------------------
Ken Krugler
custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Reply | Threaded
Open this post in threaded view
|

Re: Stashing key with AggregateFunction

Ken Krugler
Hi Fabian & Stefan,

Thanks, and yes that does work more like what I’d expect.

Regards,

— Ken

PS - Just FYI the Java code examples in the documentation referenced below have a number of bugs, see FLINK-9299.


On May 4, 2018, at 7:35 AM, Fabian Hueske <[hidden email]> wrote:

Hi Ken,

You can also use an additional ProcessWindowFunction [1] that is applied on the result of the AggregateFunction to set the key.
Since the PWF is only applied on the final result, there no overhead (actually, it might even be slightly cheaper because the AggregateFunction can be simpler).

If you don't want to use a PWF, your approach is the right one.

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation

2018-05-03 19:53 GMT+02:00 Ken Krugler <[hidden email]>:
Hi list,

I was trying different ways to implement a moving average (count based, not time based).

The blunt instrument approach is to create a custom FlatMapFunction that keeps track of the last N values.

It seemed like using an AggregateFunction would be most consistent with the Flink API, along the lines of...

            .keyBy(new MyKeySelector())
            .window(GlobalWindows.create())
            .trigger(CountTrigger.of(1))
            .aggregate(new MovingAverageAggregator(10))

This works, but the API for the AggregateFunction (MovingAverageAggregator) feels a bit odd.

Specifically, I want to emit a <key, moving average> result from getResult(), but the key isn’t passed to the createAccumulator() method, nor is it passed to the getResult() method. So in the add() method I check if the accumulator I’ve created has a key set, and if not then I extract the key from the record and set it on the accumulator, so I can use it in the getResult() call.

Is this expected, or am I miss-using the functionality?

Thanks,

— Ken

--------------------------------------------
+1 530-210-6378

Reply | Threaded
Open this post in threaded view
|

Re: Stashing key with AggregateFunction

Fabian Hueske-2
Hi Ken,

Thanks for the bug report!

Fabian

2018-05-05 0:46 GMT+02:00 Ken Krugler <[hidden email]>:
Hi Fabian & Stefan,

Thanks, and yes that does work more like what I’d expect.

Regards,

— Ken

PS - Just FYI the Java code examples in the documentation referenced below have a number of bugs, see FLINK-9299.


On May 4, 2018, at 7:35 AM, Fabian Hueske <[hidden email]> wrote:

Hi Ken,

You can also use an additional ProcessWindowFunction [1] that is applied on the result of the AggregateFunction to set the key.
Since the PWF is only applied on the final result, there no overhead (actually, it might even be slightly cheaper because the AggregateFunction can be simpler).

If you don't want to use a PWF, your approach is the right one.

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation

2018-05-03 19:53 GMT+02:00 Ken Krugler <[hidden email]>:
Hi list,

I was trying different ways to implement a moving average (count based, not time based).

The blunt instrument approach is to create a custom FlatMapFunction that keeps track of the last N values.

It seemed like using an AggregateFunction would be most consistent with the Flink API, along the lines of...

            .keyBy(new MyKeySelector())
            .window(GlobalWindows.create())
            .trigger(CountTrigger.of(1))
            .aggregate(new MovingAverageAggregator(10))

This works, but the API for the AggregateFunction (MovingAverageAggregator) feels a bit odd.

Specifically, I want to emit a <key, moving average> result from getResult(), but the key isn’t passed to the createAccumulator() method, nor is it passed to the getResult() method. So in the add() method I check if the accumulator I’ve created has a key set, and if not then I extract the key from the record and set it on the accumulator, so I can use it in the getResult() call.

Is this expected, or am I miss-using the functionality?

Thanks,

— Ken

--------------------------------------------
+1 530-210-6378