ReduceByKeyAndWindow in Flink

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

ReduceByKeyAndWindow in Flink

snntr
Hi everyone,

me again :) Let's say you have a stream, and for every window and key
you compute some aggregate value, like this:

DataStream.keyBy(..)
          .timeWindow(..)
          .apply(...)


Now I want to get the maximum aggregate value for every window over the
keys. This feels like a pretty natural use case. How can I achieve this
with Flink in the most compact way?

The options I thought of so far are:

* Use an allTimeWindow, obviously. Drawback is, that the WindowFunction
would not be distributed by keys anymore.

* use a windowAll after the WindowFunction to create windows of the
aggregates, which originated from the same timeWindow. This could be
done either with a TimeWindow or with a GlobalWindow with DeltaTrigger.
Drawback: Seems unnecessarily complicated and doubles the latency (at
least in my naive implementation ;)).

* Of course, you could also just keyBy the start time of the window
after the WindowFunction, but then you get more than one event for each
window.

Is there some easy way I am missing? If not, is there a technical
reasons, why such an "reduceByKeyAndWindow"-operator is not available in
Flink?

Cheers,

Konstantin
Reply | Threaded
Open this post in threaded view
|

Re: ReduceByKeyAndWindow in Flink

Fabian Hueske-2
Hi Konstantin,

let me first summarize to make sure I understood what you are looking for.
You computed an aggregate over a keyed event-time window and you are looking for the maximum aggregate for each group of windows over the same period of time.
So if you have
(key: 1, w-time: 10, agg: 17)
(key: 2, w-time: 10, agg: 20)
(key: 1, w-time: 20, agg: 30)
(key: 2, w-time: 20, agg: 28)
(key: 3, w-time: 20, agg: 5)

you would like to get:
(key: 2, w-time: 10, agg: 20)
(key: 1, w-time: 20, agg: 30)

If this is correct, you can do this as follows.
You can extract the window start and end time from the TimeWindow parameter of the WindowFunction and key the stream either by start or end time and apply a ReduceFunction on the keyed stream.

Best, Fabian

2015-11-23 8:41 GMT+01:00 Konstantin Knauf <[hidden email]>:
Hi everyone,

me again :) Let's say you have a stream, and for every window and key
you compute some aggregate value, like this:

DataStream.keyBy(..)
          .timeWindow(..)
          .apply(...)


Now I want to get the maximum aggregate value for every window over the
keys. This feels like a pretty natural use case. How can I achieve this
with Flink in the most compact way?

The options I thought of so far are:

* Use an allTimeWindow, obviously. Drawback is, that the WindowFunction
would not be distributed by keys anymore.

* use a windowAll after the WindowFunction to create windows of the
aggregates, which originated from the same timeWindow. This could be
done either with a TimeWindow or with a GlobalWindow with DeltaTrigger.
Drawback: Seems unnecessarily complicated and doubles the latency (at
least in my naive implementation ;)).

* Of course, you could also just keyBy the start time of the window
after the WindowFunction, but then you get more than one event for each
window.

Is there some easy way I am missing? If not, is there a technical
reasons, why such an "reduceByKeyAndWindow"-operator is not available in
Flink?

Cheers,

Konstantin

Reply | Threaded
Open this post in threaded view
|

Re: ReduceByKeyAndWindow in Flink

snntr
Hi Fabian,

thanks for your answer. Yes, that's what I want.

The solution you suggest is what I am doing right now (see last of the
bullet point in my question).

But given your example. I would expect the following output:

(key: 1, w-time: 10, agg: 17)
(key: 2, w-time: 10, agg: 20)
(key: 1, w-time: 20, agg: 30)
(key: 1, w-time: 20, agg: 30)
(key: 1, w-time: 20, agg: 30)

Because the reduce function is evaluated for every incoming event (i.e.
each key), right?

Cheers,

Konstantin

On 23.11.2015 10:47, Fabian Hueske wrote:

> Hi Konstantin,
>
> let me first summarize to make sure I understood what you are looking for.
> You computed an aggregate over a keyed event-time window and you are
> looking for the maximum aggregate for each group of windows over the
> same period of time.
> So if you have
> (key: 1, w-time: 10, agg: 17)
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
> (key: 2, w-time: 20, agg: 28)
> (key: 3, w-time: 20, agg: 5)
>
> you would like to get:
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
>
> If this is correct, you can do this as follows.
> You can extract the window start and end time from the TimeWindow
> parameter of the WindowFunction and key the stream either by start or
> end time and apply a ReduceFunction on the keyed stream.
>
> Best, Fabian
>
> 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <[hidden email]
> <mailto:[hidden email]>>:
>
>     Hi everyone,
>
>     me again :) Let's say you have a stream, and for every window and key
>     you compute some aggregate value, like this:
>
>     DataStream.keyBy(..)
>               .timeWindow(..)
>               .apply(...)
>
>
>     Now I want to get the maximum aggregate value for every window over the
>     keys. This feels like a pretty natural use case. How can I achieve this
>     with Flink in the most compact way?
>
>     The options I thought of so far are:
>
>     * Use an allTimeWindow, obviously. Drawback is, that the WindowFunction
>     would not be distributed by keys anymore.
>
>     * use a windowAll after the WindowFunction to create windows of the
>     aggregates, which originated from the same timeWindow. This could be
>     done either with a TimeWindow or with a GlobalWindow with DeltaTrigger.
>     Drawback: Seems unnecessarily complicated and doubles the latency (at
>     least in my naive implementation ;)).
>
>     * Of course, you could also just keyBy the start time of the window
>     after the WindowFunction, but then you get more than one event for each
>     window.
>
>     Is there some easy way I am missing? If not, is there a technical
>     reasons, why such an "reduceByKeyAndWindow"-operator is not available in
>     Flink?
>
>     Cheers,
>
>     Konstantin
>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: ReduceByKeyAndWindow in Flink

Matthias J. Sax-2
Hi,

Can't you use a second keyed window (with the same size) and apply
.max(...)?

-Matthias

On 11/23/2015 11:00 AM, Konstantin Knauf wrote:

> Hi Fabian,
>
> thanks for your answer. Yes, that's what I want.
>
> The solution you suggest is what I am doing right now (see last of the
> bullet point in my question).
>
> But given your example. I would expect the following output:
>
> (key: 1, w-time: 10, agg: 17)
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
> (key: 1, w-time: 20, agg: 30)
> (key: 1, w-time: 20, agg: 30)
>
> Because the reduce function is evaluated for every incoming event (i.e.
> each key), right?
>
> Cheers,
>
> Konstantin
>
> On 23.11.2015 10:47, Fabian Hueske wrote:
>> Hi Konstantin,
>>
>> let me first summarize to make sure I understood what you are looking for.
>> You computed an aggregate over a keyed event-time window and you are
>> looking for the maximum aggregate for each group of windows over the
>> same period of time.
>> So if you have
>> (key: 1, w-time: 10, agg: 17)
>> (key: 2, w-time: 10, agg: 20)
>> (key: 1, w-time: 20, agg: 30)
>> (key: 2, w-time: 20, agg: 28)
>> (key: 3, w-time: 20, agg: 5)
>>
>> you would like to get:
>> (key: 2, w-time: 10, agg: 20)
>> (key: 1, w-time: 20, agg: 30)
>>
>> If this is correct, you can do this as follows.
>> You can extract the window start and end time from the TimeWindow
>> parameter of the WindowFunction and key the stream either by start or
>> end time and apply a ReduceFunction on the keyed stream.
>>
>> Best, Fabian
>>
>> 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <[hidden email]
>> <mailto:[hidden email]>>:
>>
>>     Hi everyone,
>>
>>     me again :) Let's say you have a stream, and for every window and key
>>     you compute some aggregate value, like this:
>>
>>     DataStream.keyBy(..)
>>               .timeWindow(..)
>>               .apply(...)
>>
>>
>>     Now I want to get the maximum aggregate value for every window over the
>>     keys. This feels like a pretty natural use case. How can I achieve this
>>     with Flink in the most compact way?
>>
>>     The options I thought of so far are:
>>
>>     * Use an allTimeWindow, obviously. Drawback is, that the WindowFunction
>>     would not be distributed by keys anymore.
>>
>>     * use a windowAll after the WindowFunction to create windows of the
>>     aggregates, which originated from the same timeWindow. This could be
>>     done either with a TimeWindow or with a GlobalWindow with DeltaTrigger.
>>     Drawback: Seems unnecessarily complicated and doubles the latency (at
>>     least in my naive implementation ;)).
>>
>>     * Of course, you could also just keyBy the start time of the window
>>     after the WindowFunction, but then you get more than one event for each
>>     window.
>>
>>     Is there some easy way I am missing? If not, is there a technical
>>     reasons, why such an "reduceByKeyAndWindow"-operator is not available in
>>     Flink?
>>
>>     Cheers,
>>
>>     Konstantin
>>
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: ReduceByKeyAndWindow in Flink

Fabian Hueske-2
In reply to this post by snntr
If you set the key to the time attribute, the "old" key is no longer valid.
The streams are organized by time and only one aggregate for each window-time should be computed.

This should do what you are looking for:

DataStream
  .keyBy(_._1) // key by orginal key
  .timeWindow(..) 
  .apply(...)  // extract window end time: (origKey, time, agg)
  .keyBy(_._2) // key by time field
  .maxBy(_._3) // value with max agg field

Best, Fabian

2015-11-23 11:00 GMT+01:00 Konstantin Knauf <[hidden email]>:
Hi Fabian,

thanks for your answer. Yes, that's what I want.

The solution you suggest is what I am doing right now (see last of the
bullet point in my question).

But given your example. I would expect the following output:

(key: 1, w-time: 10, agg: 17)
(key: 2, w-time: 10, agg: 20)
(key: 1, w-time: 20, agg: 30)
(key: 1, w-time: 20, agg: 30)
(key: 1, w-time: 20, agg: 30)

Because the reduce function is evaluated for every incoming event (i.e.
each key), right?

Cheers,

Konstantin

On 23.11.2015 10:47, Fabian Hueske wrote:
> Hi Konstantin,
>
> let me first summarize to make sure I understood what you are looking for.
> You computed an aggregate over a keyed event-time window and you are
> looking for the maximum aggregate for each group of windows over the
> same period of time.
> So if you have
> (key: 1, w-time: 10, agg: 17)
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
> (key: 2, w-time: 20, agg: 28)
> (key: 3, w-time: 20, agg: 5)
>
> you would like to get:
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
>
> If this is correct, you can do this as follows.
> You can extract the window start and end time from the TimeWindow
> parameter of the WindowFunction and key the stream either by start or
> end time and apply a ReduceFunction on the keyed stream.
>
> Best, Fabian
>
> 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <[hidden email]
> <mailto:[hidden email]>>:
>
>     Hi everyone,
>
>     me again :) Let's say you have a stream, and for every window and key
>     you compute some aggregate value, like this:
>
>     DataStream.keyBy(..)
>               .timeWindow(..)
>               .apply(...)
>
>
>     Now I want to get the maximum aggregate value for every window over the
>     keys. This feels like a pretty natural use case. How can I achieve this
>     with Flink in the most compact way?
>
>     The options I thought of so far are:
>
>     * Use an allTimeWindow, obviously. Drawback is, that the WindowFunction
>     would not be distributed by keys anymore.
>
>     * use a windowAll after the WindowFunction to create windows of the
>     aggregates, which originated from the same timeWindow. This could be
>     done either with a TimeWindow or with a GlobalWindow with DeltaTrigger.
>     Drawback: Seems unnecessarily complicated and doubles the latency (at
>     least in my naive implementation ;)).
>
>     * Of course, you could also just keyBy the start time of the window
>     after the WindowFunction, but then you get more than one event for each
>     window.
>
>     Is there some easy way I am missing? If not, is there a technical
>     reasons, why such an "reduceByKeyAndWindow"-operator is not available in
>     Flink?
>
>     Cheers,
>
>     Konstantin
>
>

--
Konstantin Knauf * [hidden email] * <a href="tel:%2B49-174-3413182" value="+491743413182">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: ReduceByKeyAndWindow in Flink

Gyula Fóra-2
Hi,

Alright it seems there are multiple ways of doing this.

I would do something like:

ds.keyBy(key)
.timeWindow(w)
.reduce(...)
.timeWindowAll(w)
.reduce(...)

Maybe Aljoscha could jump in here :D

Cheers,
Gyula

Fabian Hueske <[hidden email]> ezt írta (időpont: 2015. nov. 23., H, 11:21):
If you set the key to the time attribute, the "old" key is no longer valid.
The streams are organized by time and only one aggregate for each window-time should be computed.

This should do what you are looking for:

DataStream
  .keyBy(_._1) // key by orginal key
  .timeWindow(..) 
  .apply(...)  // extract window end time: (origKey, time, agg)
  .keyBy(_._2) // key by time field
  .maxBy(_._3) // value with max agg field

Best, Fabian

2015-11-23 11:00 GMT+01:00 Konstantin Knauf <[hidden email]>:
Hi Fabian,

thanks for your answer. Yes, that's what I want.

The solution you suggest is what I am doing right now (see last of the
bullet point in my question).

But given your example. I would expect the following output:

(key: 1, w-time: 10, agg: 17)
(key: 2, w-time: 10, agg: 20)
(key: 1, w-time: 20, agg: 30)
(key: 1, w-time: 20, agg: 30)
(key: 1, w-time: 20, agg: 30)

Because the reduce function is evaluated for every incoming event (i.e.
each key), right?

Cheers,

Konstantin

On 23.11.2015 10:47, Fabian Hueske wrote:
> Hi Konstantin,
>
> let me first summarize to make sure I understood what you are looking for.
> You computed an aggregate over a keyed event-time window and you are
> looking for the maximum aggregate for each group of windows over the
> same period of time.
> So if you have
> (key: 1, w-time: 10, agg: 17)
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
> (key: 2, w-time: 20, agg: 28)
> (key: 3, w-time: 20, agg: 5)
>
> you would like to get:
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
>
> If this is correct, you can do this as follows.
> You can extract the window start and end time from the TimeWindow
> parameter of the WindowFunction and key the stream either by start or
> end time and apply a ReduceFunction on the keyed stream.
>
> Best, Fabian
>
> 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <[hidden email]
> <mailto:[hidden email]>>:
>
>     Hi everyone,
>
>     me again :) Let's say you have a stream, and for every window and key
>     you compute some aggregate value, like this:
>
>     DataStream.keyBy(..)
>               .timeWindow(..)
>               .apply(...)
>
>
>     Now I want to get the maximum aggregate value for every window over the
>     keys. This feels like a pretty natural use case. How can I achieve this
>     with Flink in the most compact way?
>
>     The options I thought of so far are:
>
>     * Use an allTimeWindow, obviously. Drawback is, that the WindowFunction
>     would not be distributed by keys anymore.
>
>     * use a windowAll after the WindowFunction to create windows of the
>     aggregates, which originated from the same timeWindow. This could be
>     done either with a TimeWindow or with a GlobalWindow with DeltaTrigger.
>     Drawback: Seems unnecessarily complicated and doubles the latency (at
>     least in my naive implementation ;)).
>
>     * Of course, you could also just keyBy the start time of the window
>     after the WindowFunction, but then you get more than one event for each
>     window.
>
>     Is there some easy way I am missing? If not, is there a technical
>     reasons, why such an "reduceByKeyAndWindow"-operator is not available in
>     Flink?
>
>     Cheers,
>
>     Konstantin
>
>

--
Konstantin Knauf * [hidden email] * <a href="tel:%2B49-174-3413182" value="+491743413182" target="_blank">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: ReduceByKeyAndWindow in Flink

snntr
Thanks!

@Fabian: Yepp, but this still results in multiple outputs per window,
because the maximum is emitted for every key.

@Gyula: Yepp, that's the second bullet point from my question ;) The way
I implemented it, it basically doubles the latency, because the
timeWindowAll has to wait for the next timeWindow before it can close
the previous one. So if the first timeWindow is 10s, it takes 20s until
you have a result, although it cant change after 10s. You know what I mean?

Cheers,

Konstantin

On 23.11.2015 11:32, Gyula Fóra wrote:

> Hi,
>
> Alright it seems there are multiple ways of doing this.
>
> I would do something like:
>
> ds.keyBy(key)
> .timeWindow(w)
> .reduce(...)
> .timeWindowAll(w)
> .reduce(...)
>
> Maybe Aljoscha could jump in here :D
>
> Cheers,
> Gyula
>
> Fabian Hueske <[hidden email] <mailto:[hidden email]>> ezt írta
> (időpont: 2015. nov. 23., H, 11:21):
>
>     If you set the key to the time attribute, the "old" key is no longer
>     valid.
>     The streams are organized by time and only one aggregate for each
>     window-time should be computed.
>
>     This should do what you are looking for:
>
>     DataStream
>       .keyBy(_._1) // key by orginal key
>       .timeWindow(..)
>       .apply(...)  // extract window end time: (origKey, time, agg)
>       .keyBy(_._2) // key by time field
>       .maxBy(_._3) // value with max agg field
>
>     Best, Fabian
>
>     2015-11-23 11:00 GMT+01:00 Konstantin Knauf
>     <[hidden email] <mailto:[hidden email]>>:
>
>         Hi Fabian,
>
>         thanks for your answer. Yes, that's what I want.
>
>         The solution you suggest is what I am doing right now (see last
>         of the
>         bullet point in my question).
>
>         But given your example. I would expect the following output:
>
>         (key: 1, w-time: 10, agg: 17)
>         (key: 2, w-time: 10, agg: 20)
>         (key: 1, w-time: 20, agg: 30)
>         (key: 1, w-time: 20, agg: 30)
>         (key: 1, w-time: 20, agg: 30)
>
>         Because the reduce function is evaluated for every incoming
>         event (i.e.
>         each key), right?
>
>         Cheers,
>
>         Konstantin
>
>         On 23.11.2015 10:47, Fabian Hueske wrote:
>         > Hi Konstantin,
>         >
>         > let me first summarize to make sure I understood what you are looking for.
>         > You computed an aggregate over a keyed event-time window and you are
>         > looking for the maximum aggregate for each group of windows over the
>         > same period of time.
>         > So if you have
>         > (key: 1, w-time: 10, agg: 17)
>         > (key: 2, w-time: 10, agg: 20)
>         > (key: 1, w-time: 20, agg: 30)
>         > (key: 2, w-time: 20, agg: 28)
>         > (key: 3, w-time: 20, agg: 5)
>         >
>         > you would like to get:
>         > (key: 2, w-time: 10, agg: 20)
>         > (key: 1, w-time: 20, agg: 30)
>         >
>         > If this is correct, you can do this as follows.
>         > You can extract the window start and end time from the TimeWindow
>         > parameter of the WindowFunction and key the stream either by start or
>         > end time and apply a ReduceFunction on the keyed stream.
>         >
>         > Best, Fabian
>         >
>         > 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <[hidden email] <mailto:[hidden email]>
>         > <mailto:[hidden email]
>         <mailto:[hidden email]>>>:
>         >
>         >     Hi everyone,
>         >
>         >     me again :) Let's say you have a stream, and for every
>         window and key
>         >     you compute some aggregate value, like this:
>         >
>         >     DataStream.keyBy(..)
>         >               .timeWindow(..)
>         >               .apply(...)
>         >
>         >
>         >     Now I want to get the maximum aggregate value for every
>         window over the
>         >     keys. This feels like a pretty natural use case. How can I
>         achieve this
>         >     with Flink in the most compact way?
>         >
>         >     The options I thought of so far are:
>         >
>         >     * Use an allTimeWindow, obviously. Drawback is, that the
>         WindowFunction
>         >     would not be distributed by keys anymore.
>         >
>         >     * use a windowAll after the WindowFunction to create
>         windows of the
>         >     aggregates, which originated from the same timeWindow.
>         This could be
>         >     done either with a TimeWindow or with a GlobalWindow with
>         DeltaTrigger.
>         >     Drawback: Seems unnecessarily complicated and doubles the
>         latency (at
>         >     least in my naive implementation ;)).
>         >
>         >     * Of course, you could also just keyBy the start time of
>         the window
>         >     after the WindowFunction, but then you get more than one
>         event for each
>         >     window.
>         >
>         >     Is there some easy way I am missing? If not, is there a
>         technical
>         >     reasons, why such an "reduceByKeyAndWindow"-operator is
>         not available in
>         >     Flink?
>         >
>         >     Cheers,
>         >
>         >     Konstantin
>         >
>         >
>
>         --
>         Konstantin Knauf * [hidden email]
>         <mailto:[hidden email]> * +49-174-3413182
>         <tel:%2B49-174-3413182>
>         TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>         Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>         Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: ReduceByKeyAndWindow in Flink

Gyula Fóra-2
Yes, you are right I think we should have some nice abstractions for doing this. 

Before the rewrite of the windowing runtime to support out-of-order events,  we had abstractions for supporting this but that code was not feasible from performance perspective.  (The result of a keyed window reduce used to be a window containing all the aggregates and one could then just aggregate again on the result without specifying the window again)

Maybe we could implement similar abstractions on the new window runtime, I think that would be really awesome.

Gyula

Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 23., H, 11:40):
Thanks!

@Fabian: Yepp, but this still results in multiple outputs per window,
because the maximum is emitted for every key.

@Gyula: Yepp, that's the second bullet point from my question ;) The way
I implemented it, it basically doubles the latency, because the
timeWindowAll has to wait for the next timeWindow before it can close
the previous one. So if the first timeWindow is 10s, it takes 20s until
you have a result, although it cant change after 10s. You know what I mean?

Cheers,

Konstantin

On 23.11.2015 11:32, Gyula Fóra wrote:
> Hi,
>
> Alright it seems there are multiple ways of doing this.
>
> I would do something like:
>
> ds.keyBy(key)
> .timeWindow(w)
> .reduce(...)
> .timeWindowAll(w)
> .reduce(...)
>
> Maybe Aljoscha could jump in here :D
>
> Cheers,
> Gyula
>
> Fabian Hueske <[hidden email] <mailto:[hidden email]>> ezt írta
> (időpont: 2015. nov. 23., H, 11:21):
>
>     If you set the key to the time attribute, the "old" key is no longer
>     valid.
>     The streams are organized by time and only one aggregate for each
>     window-time should be computed.
>
>     This should do what you are looking for:
>
>     DataStream
>       .keyBy(_._1) // key by orginal key
>       .timeWindow(..)
>       .apply(...)  // extract window end time: (origKey, time, agg)
>       .keyBy(_._2) // key by time field
>       .maxBy(_._3) // value with max agg field
>
>     Best, Fabian
>
>     2015-11-23 11:00 GMT+01:00 Konstantin Knauf
>     <[hidden email] <mailto:[hidden email]>>:
>
>         Hi Fabian,
>
>         thanks for your answer. Yes, that's what I want.
>
>         The solution you suggest is what I am doing right now (see last
>         of the
>         bullet point in my question).
>
>         But given your example. I would expect the following output:
>
>         (key: 1, w-time: 10, agg: 17)
>         (key: 2, w-time: 10, agg: 20)
>         (key: 1, w-time: 20, agg: 30)
>         (key: 1, w-time: 20, agg: 30)
>         (key: 1, w-time: 20, agg: 30)
>
>         Because the reduce function is evaluated for every incoming
>         event (i.e.
>         each key), right?
>
>         Cheers,
>
>         Konstantin
>
>         On 23.11.2015 10:47, Fabian Hueske wrote:
>         > Hi Konstantin,
>         >
>         > let me first summarize to make sure I understood what you are looking for.
>         > You computed an aggregate over a keyed event-time window and you are
>         > looking for the maximum aggregate for each group of windows over the
>         > same period of time.
>         > So if you have
>         > (key: 1, w-time: 10, agg: 17)
>         > (key: 2, w-time: 10, agg: 20)
>         > (key: 1, w-time: 20, agg: 30)
>         > (key: 2, w-time: 20, agg: 28)
>         > (key: 3, w-time: 20, agg: 5)
>         >
>         > you would like to get:
>         > (key: 2, w-time: 10, agg: 20)
>         > (key: 1, w-time: 20, agg: 30)
>         >
>         > If this is correct, you can do this as follows.
>         > You can extract the window start and end time from the TimeWindow
>         > parameter of the WindowFunction and key the stream either by start or
>         > end time and apply a ReduceFunction on the keyed stream.
>         >
>         > Best, Fabian
>         >
>         > 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <[hidden email] <mailto:[hidden email]>
>         > <mailto:[hidden email]
>         <mailto:[hidden email]>>>:
>         >
>         >     Hi everyone,
>         >
>         >     me again :) Let's say you have a stream, and for every
>         window and key
>         >     you compute some aggregate value, like this:
>         >
>         >     DataStream.keyBy(..)
>         >               .timeWindow(..)
>         >               .apply(...)
>         >
>         >
>         >     Now I want to get the maximum aggregate value for every
>         window over the
>         >     keys. This feels like a pretty natural use case. How can I
>         achieve this
>         >     with Flink in the most compact way?
>         >
>         >     The options I thought of so far are:
>         >
>         >     * Use an allTimeWindow, obviously. Drawback is, that the
>         WindowFunction
>         >     would not be distributed by keys anymore.
>         >
>         >     * use a windowAll after the WindowFunction to create
>         windows of the
>         >     aggregates, which originated from the same timeWindow.
>         This could be
>         >     done either with a TimeWindow or with a GlobalWindow with
>         DeltaTrigger.
>         >     Drawback: Seems unnecessarily complicated and doubles the
>         latency (at
>         >     least in my naive implementation ;)).
>         >
>         >     * Of course, you could also just keyBy the start time of
>         the window
>         >     after the WindowFunction, but then you get more than one
>         event for each
>         >     window.
>         >
>         >     Is there some easy way I am missing? If not, is there a
>         technical
>         >     reasons, why such an "reduceByKeyAndWindow"-operator is
>         not available in
>         >     Flink?
>         >
>         >     Cheers,
>         >
>         >     Konstantin
>         >
>         >
>
>         --
>         Konstantin Knauf * [hidden email]
>         <mailto:[hidden email]> * +49-174-3413182
>         <tel:%2B49-174-3413182>
>         TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>         Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>         Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: ReduceByKeyAndWindow in Flink

Aljoscha Krettek
Hi,
@Konstantin: are you using event-time or processing-time windows. If you are using processing time, then you can only do it the way Fabian suggested. The problem here is, however, that the .keyBy().reduce() combination would emit a new maximum for every element that arrives there and you never know when you saw the final element, i.e. the maximum.

If you are using event-time, then you are indeed lucky because then you can use what Gyula suggested and you won’t have latency, if I’m correct. The reason is that the watermark that flushes out the windows in the first (keyed window) will also flush out the elements in the all-window. So the keyed window will do computations, send along the elements and then after it is done it will forward the watermark. This watermark will immediately trigger computation of the all-window for the same time period.

Cheers,
Aljoscha

> On 23 Nov 2015, at 11:51, Gyula Fóra <[hidden email]> wrote:
>
> Yes, you are right I think we should have some nice abstractions for doing this.
>
> Before the rewrite of the windowing runtime to support out-of-order events,  we had abstractions for supporting this but that code was not feasible from performance perspective.  (The result of a keyed window reduce used to be a window containing all the aggregates and one could then just aggregate again on the result without specifying the window again)
>
> Maybe we could implement similar abstractions on the new window runtime, I think that would be really awesome.
>
> Gyula
>
> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 23., H, 11:40):
> Thanks!
>
> @Fabian: Yepp, but this still results in multiple outputs per window,
> because the maximum is emitted for every key.
>
> @Gyula: Yepp, that's the second bullet point from my question ;) The way
> I implemented it, it basically doubles the latency, because the
> timeWindowAll has to wait for the next timeWindow before it can close
> the previous one. So if the first timeWindow is 10s, it takes 20s until
> you have a result, although it cant change after 10s. You know what I mean?
>
> Cheers,
>
> Konstantin
>
> On 23.11.2015 11:32, Gyula Fóra wrote:
> > Hi,
> >
> > Alright it seems there are multiple ways of doing this.
> >
> > I would do something like:
> >
> > ds.keyBy(key)
> > .timeWindow(w)
> > .reduce(...)
> > .timeWindowAll(w)
> > .reduce(...)
> >
> > Maybe Aljoscha could jump in here :D
> >
> > Cheers,
> > Gyula
> >
> > Fabian Hueske <[hidden email] <mailto:[hidden email]>> ezt írta
> > (időpont: 2015. nov. 23., H, 11:21):
> >
> >     If you set the key to the time attribute, the "old" key is no longer
> >     valid.
> >     The streams are organized by time and only one aggregate for each
> >     window-time should be computed.
> >
> >     This should do what you are looking for:
> >
> >     DataStream
> >       .keyBy(_._1) // key by orginal key
> >       .timeWindow(..)
> >       .apply(...)  // extract window end time: (origKey, time, agg)
> >       .keyBy(_._2) // key by time field
> >       .maxBy(_._3) // value with max agg field
> >
> >     Best, Fabian
> >
> >     2015-11-23 11:00 GMT+01:00 Konstantin Knauf
> >     <[hidden email] <mailto:[hidden email]>>:
> >
> >         Hi Fabian,
> >
> >         thanks for your answer. Yes, that's what I want.
> >
> >         The solution you suggest is what I am doing right now (see last
> >         of the
> >         bullet point in my question).
> >
> >         But given your example. I would expect the following output:
> >
> >         (key: 1, w-time: 10, agg: 17)
> >         (key: 2, w-time: 10, agg: 20)
> >         (key: 1, w-time: 20, agg: 30)
> >         (key: 1, w-time: 20, agg: 30)
> >         (key: 1, w-time: 20, agg: 30)
> >
> >         Because the reduce function is evaluated for every incoming
> >         event (i.e.
> >         each key), right?
> >
> >         Cheers,
> >
> >         Konstantin
> >
> >         On 23.11.2015 10:47, Fabian Hueske wrote:
> >         > Hi Konstantin,
> >         >
> >         > let me first summarize to make sure I understood what you are looking for.
> >         > You computed an aggregate over a keyed event-time window and you are
> >         > looking for the maximum aggregate for each group of windows over the
> >         > same period of time.
> >         > So if you have
> >         > (key: 1, w-time: 10, agg: 17)
> >         > (key: 2, w-time: 10, agg: 20)
> >         > (key: 1, w-time: 20, agg: 30)
> >         > (key: 2, w-time: 20, agg: 28)
> >         > (key: 3, w-time: 20, agg: 5)
> >         >
> >         > you would like to get:
> >         > (key: 2, w-time: 10, agg: 20)
> >         > (key: 1, w-time: 20, agg: 30)
> >         >
> >         > If this is correct, you can do this as follows.
> >         > You can extract the window start and end time from the TimeWindow
> >         > parameter of the WindowFunction and key the stream either by start or
> >         > end time and apply a ReduceFunction on the keyed stream.
> >         >
> >         > Best, Fabian
> >         >
> >         > 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <[hidden email] <mailto:[hidden email]>
> >         > <mailto:[hidden email]
> >         <mailto:[hidden email]>>>:
> >         >
> >         >     Hi everyone,
> >         >
> >         >     me again :) Let's say you have a stream, and for every
> >         window and key
> >         >     you compute some aggregate value, like this:
> >         >
> >         >     DataStream.keyBy(..)
> >         >               .timeWindow(..)
> >         >               .apply(...)
> >         >
> >         >
> >         >     Now I want to get the maximum aggregate value for every
> >         window over the
> >         >     keys. This feels like a pretty natural use case. How can I
> >         achieve this
> >         >     with Flink in the most compact way?
> >         >
> >         >     The options I thought of so far are:
> >         >
> >         >     * Use an allTimeWindow, obviously. Drawback is, that the
> >         WindowFunction
> >         >     would not be distributed by keys anymore.
> >         >
> >         >     * use a windowAll after the WindowFunction to create
> >         windows of the
> >         >     aggregates, which originated from the same timeWindow.
> >         This could be
> >         >     done either with a TimeWindow or with a GlobalWindow with
> >         DeltaTrigger.
> >         >     Drawback: Seems unnecessarily complicated and doubles the
> >         latency (at
> >         >     least in my naive implementation ;)).
> >         >
> >         >     * Of course, you could also just keyBy the start time of
> >         the window
> >         >     after the WindowFunction, but then you get more than one
> >         event for each
> >         >     window.
> >         >
> >         >     Is there some easy way I am missing? If not, is there a
> >         technical
> >         >     reasons, why such an "reduceByKeyAndWindow"-operator is
> >         not available in
> >         >     Flink?
> >         >
> >         >     Cheers,
> >         >
> >         >     Konstantin
> >         >
> >         >
> >
> >         --
> >         Konstantin Knauf * [hidden email]
> >         <mailto:[hidden email]> * +49-174-3413182
> >         <tel:%2B49-174-3413182>
> >         TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >         Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >         Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >
> >
>
> --
> Konstantin Knauf * [hidden email] * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: ReduceByKeyAndWindow in Flink

Stephan Ewen
One addition: You can set the system to use "ingestion time", which gives you event time with auto-generated timestamps and watermarks, based on the time that the events are seen in the sources.

That way you have the same simplicity as processing time, and you get the window alignment that Aljoscha described (second total max window has the same elements as initial max-per-key window).

On Mon, Nov 23, 2015 at 12:49 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
@Konstantin: are you using event-time or processing-time windows. If you are using processing time, then you can only do it the way Fabian suggested. The problem here is, however, that the .keyBy().reduce() combination would emit a new maximum for every element that arrives there and you never know when you saw the final element, i.e. the maximum.

If you are using event-time, then you are indeed lucky because then you can use what Gyula suggested and you won’t have latency, if I’m correct. The reason is that the watermark that flushes out the windows in the first (keyed window) will also flush out the elements in the all-window. So the keyed window will do computations, send along the elements and then after it is done it will forward the watermark. This watermark will immediately trigger computation of the all-window for the same time period.

Cheers,
Aljoscha
> On 23 Nov 2015, at 11:51, Gyula Fóra <[hidden email]> wrote:
>
> Yes, you are right I think we should have some nice abstractions for doing this.
>
> Before the rewrite of the windowing runtime to support out-of-order events,  we had abstractions for supporting this but that code was not feasible from performance perspective.  (The result of a keyed window reduce used to be a window containing all the aggregates and one could then just aggregate again on the result without specifying the window again)
>
> Maybe we could implement similar abstractions on the new window runtime, I think that would be really awesome.
>
> Gyula
>
> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 23., H, 11:40):
> Thanks!
>
> @Fabian: Yepp, but this still results in multiple outputs per window,
> because the maximum is emitted for every key.
>
> @Gyula: Yepp, that's the second bullet point from my question ;) The way
> I implemented it, it basically doubles the latency, because the
> timeWindowAll has to wait for the next timeWindow before it can close
> the previous one. So if the first timeWindow is 10s, it takes 20s until
> you have a result, although it cant change after 10s. You know what I mean?
>
> Cheers,
>
> Konstantin
>
> On 23.11.2015 11:32, Gyula Fóra wrote:
> > Hi,
> >
> > Alright it seems there are multiple ways of doing this.
> >
> > I would do something like:
> >
> > ds.keyBy(key)
> > .timeWindow(w)
> > .reduce(...)
> > .timeWindowAll(w)
> > .reduce(...)
> >
> > Maybe Aljoscha could jump in here :D
> >
> > Cheers,
> > Gyula
> >
> > Fabian Hueske <[hidden email] <mailto:[hidden email]>> ezt írta
> > (időpont: 2015. nov. 23., H, 11:21):
> >
> >     If you set the key to the time attribute, the "old" key is no longer
> >     valid.
> >     The streams are organized by time and only one aggregate for each
> >     window-time should be computed.
> >
> >     This should do what you are looking for:
> >
> >     DataStream
> >       .keyBy(_._1) // key by orginal key
> >       .timeWindow(..)
> >       .apply(...)  // extract window end time: (origKey, time, agg)
> >       .keyBy(_._2) // key by time field
> >       .maxBy(_._3) // value with max agg field
> >
> >     Best, Fabian
> >
> >     2015-11-23 11:00 GMT+01:00 Konstantin Knauf
> >     <[hidden email] <mailto:[hidden email]>>:
> >
> >         Hi Fabian,
> >
> >         thanks for your answer. Yes, that's what I want.
> >
> >         The solution you suggest is what I am doing right now (see last
> >         of the
> >         bullet point in my question).
> >
> >         But given your example. I would expect the following output:
> >
> >         (key: 1, w-time: 10, agg: 17)
> >         (key: 2, w-time: 10, agg: 20)
> >         (key: 1, w-time: 20, agg: 30)
> >         (key: 1, w-time: 20, agg: 30)
> >         (key: 1, w-time: 20, agg: 30)
> >
> >         Because the reduce function is evaluated for every incoming
> >         event (i.e.
> >         each key), right?
> >
> >         Cheers,
> >
> >         Konstantin
> >
> >         On 23.11.2015 10:47, Fabian Hueske wrote:
> >         > Hi Konstantin,
> >         >
> >         > let me first summarize to make sure I understood what you are looking for.
> >         > You computed an aggregate over a keyed event-time window and you are
> >         > looking for the maximum aggregate for each group of windows over the
> >         > same period of time.
> >         > So if you have
> >         > (key: 1, w-time: 10, agg: 17)
> >         > (key: 2, w-time: 10, agg: 20)
> >         > (key: 1, w-time: 20, agg: 30)
> >         > (key: 2, w-time: 20, agg: 28)
> >         > (key: 3, w-time: 20, agg: 5)
> >         >
> >         > you would like to get:
> >         > (key: 2, w-time: 10, agg: 20)
> >         > (key: 1, w-time: 20, agg: 30)
> >         >
> >         > If this is correct, you can do this as follows.
> >         > You can extract the window start and end time from the TimeWindow
> >         > parameter of the WindowFunction and key the stream either by start or
> >         > end time and apply a ReduceFunction on the keyed stream.
> >         >
> >         > Best, Fabian
> >         >
> >         > 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <[hidden email] <mailto:[hidden email]>
> >         > <mailto:[hidden email]
> >         <mailto:[hidden email]>>>:
> >         >
> >         >     Hi everyone,
> >         >
> >         >     me again :) Let's say you have a stream, and for every
> >         window and key
> >         >     you compute some aggregate value, like this:
> >         >
> >         >     DataStream.keyBy(..)
> >         >               .timeWindow(..)
> >         >               .apply(...)
> >         >
> >         >
> >         >     Now I want to get the maximum aggregate value for every
> >         window over the
> >         >     keys. This feels like a pretty natural use case. How can I
> >         achieve this
> >         >     with Flink in the most compact way?
> >         >
> >         >     The options I thought of so far are:
> >         >
> >         >     * Use an allTimeWindow, obviously. Drawback is, that the
> >         WindowFunction
> >         >     would not be distributed by keys anymore.
> >         >
> >         >     * use a windowAll after the WindowFunction to create
> >         windows of the
> >         >     aggregates, which originated from the same timeWindow.
> >         This could be
> >         >     done either with a TimeWindow or with a GlobalWindow with
> >         DeltaTrigger.
> >         >     Drawback: Seems unnecessarily complicated and doubles the
> >         latency (at
> >         >     least in my naive implementation ;)).
> >         >
> >         >     * Of course, you could also just keyBy the start time of
> >         the window
> >         >     after the WindowFunction, but then you get more than one
> >         event for each
> >         >     window.
> >         >
> >         >     Is there some easy way I am missing? If not, is there a
> >         technical
> >         >     reasons, why such an "reduceByKeyAndWindow"-operator is
> >         not available in
> >         >     Flink?
> >         >
> >         >     Cheers,
> >         >
> >         >     Konstantin
> >         >
> >         >
> >
> >         --
> >         Konstantin Knauf * [hidden email]
> >         <mailto:[hidden email]> * <a href="tel:%2B49-174-3413182" value="+491743413182">+49-174-3413182
> >         <tel:%2B49-174-3413182>
> >         TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >         Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >         Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >
> >
>
> --
> Konstantin Knauf * [hidden email] * <a href="tel:%2B49-174-3413182" value="+491743413182">+49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082