Hello community,
how do I define a custom aggregate function in Flink Streaming (Scala)? Could you please provide an example on how to do that? Thank you and best regards, Philipp |
Hi, with the current API this should do what you are after: val input = ... val result = input .window(...) .groupBy(...) .reduceWindow( /* your reduce function */ ) With the reduce function you should be able to implement any custom aggregations. You can also use foldWindow() if you want to do a functional fold over the window. I hope this helps. Cheers, Aljoscha On Fri, 21 Aug 2015 at 14:51 Philipp Goetze <[hidden email]> wrote: Hello community, |
Hi, Alternatively if you would like to create continuous aggregates per key you can use ds.groupBy().reduce(..), or use one of the stateful functions in the scala api such as mapWithState. For a rolling average per key you can check this exmple: https://github.com/gyfora/summer-school/blob/master/flink/src/main/scala/summerschool/FlinkKafkaExample.scala Cheers, Gyula On Fri, Aug 21, 2015 at 3:28 PM Aljoscha Krettek <[hidden email]> wrote:
|
In reply to this post by Aljoscha Krettek
Thank you Aljoscha,
I guessed that I should use the reduce method. However, I do not look for window aggregations. I want to do this on a grouped stream. The problem is we work with Lists instead of tuples and thus we can not use the pre-implemented aggregates. So the idea is to call it like that: val aggr = source.groupBy(_(0)).reduce(new customReducer(1))And this is the signature of the class: class customReducer(field: Int) extends RichReduceFunction[List[Any]] How do I have to implement this class now, so that it is working correctly even with parallelism > 1? I hope you understand what I try to do. =) Kind Regards, Philipp On 21.08.2015 15:28, Aljoscha Krettek
wrote:
|
Free forum by Nabble | Edit this page |