Custom Aggregate - Example

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom Aggregate - Example

Philipp Goetze
Hello community,

how do I define a custom aggregate function in Flink Streaming (Scala)?
Could you please provide an example on how to do that?

Thank you and best regards,
Philipp
Reply | Threaded
Open this post in threaded view
|

Re: Custom Aggregate - Example

Aljoscha Krettek
Hi,
with the current API this should do what you are after:

val input = ...
 
val result = input
  .window(...)
  .groupBy(...)
  .reduceWindow( /* your reduce function */ )

With the reduce function you should be able to implement any custom aggregations. You can also use foldWindow() if you want to do a functional fold over the window.

I hope this helps.

Cheers,
Aljoscha 

On Fri, 21 Aug 2015 at 14:51 Philipp Goetze <[hidden email]> wrote:
Hello community,

how do I define a custom aggregate function in Flink Streaming (Scala)?
Could you please provide an example on how to do that?

Thank you and best regards,
Philipp
Reply | Threaded
Open this post in threaded view
|

Re: Custom Aggregate - Example

Gyula Fóra-2
Hi,

Alternatively if you would like to create continuous aggregates per key you can use ds.groupBy().reduce(..), or use one of the stateful functions in the scala api such as mapWithState.

For a rolling average per key you can check this exmple:
https://github.com/gyfora/summer-school/blob/master/flink/src/main/scala/summerschool/FlinkKafkaExample.scala

Cheers,
Gyula

On Fri, Aug 21, 2015 at 3:28 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
with the current API this should do what you are after:

val input = ...
 
val result = input
  .window(...)
  .groupBy(...)
  .reduceWindow( /* your reduce function */ )

With the reduce function you should be able to implement any custom aggregations. You can also use foldWindow() if you want to do a functional fold over the window.

I hope this helps.

Cheers,
Aljoscha 

On Fri, 21 Aug 2015 at 14:51 Philipp Goetze <[hidden email]> wrote:
Hello community,

how do I define a custom aggregate function in Flink Streaming (Scala)?
Could you please provide an example on how to do that?

Thank you and best regards,
Philipp
Reply | Threaded
Open this post in threaded view
|

Re: Custom Aggregate - Example

Philipp Goetze
In reply to this post by Aljoscha Krettek
Thank you Aljoscha,

I guessed that I should use the reduce method. However, I do not look for window aggregations. I want to do this on a grouped stream.

The problem is we work with Lists instead of tuples and thus we can not use the pre-implemented aggregates.

So the idea is to call it like that:
val aggr = source.groupBy(_(0)).reduce(new customReducer(1))
And this is the signature of the class:
class customReducer(field: Int) extends RichReduceFunction[List[Any]]

How do I have to implement this class now, so that it is working correctly even with parallelism > 1?

I hope you understand what I try to do. =)

Kind Regards,
Philipp


On 21.08.2015 15:28, Aljoscha Krettek wrote:
Hi,
with the current API this should do what you are after:

val input = ...
 
val result = input
  .window(...)
  .groupBy(...)
  .reduceWindow( /* your reduce function */ )

With the reduce function you should be able to implement any custom aggregations. You can also use foldWindow() if you want to do a functional fold over the window.

I hope this helps.

Cheers,
Aljoscha 

On Fri, 21 Aug 2015 at 14:51 Philipp Goetze <[hidden email]> wrote:
Hello community,

how do I define a custom aggregate function in Flink Streaming (Scala)?
Could you please provide an example on how to do that?

Thank you and best regards,
Philipp