what happens to the aggregate in a fold on a KeyedStream if the events for a key stop coming ?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

what happens to the aggregate in a fold on a KeyedStream if the events for a key stop coming ?

Bart van Deenen
If I do a fold on a KeyedStream, I aggregate events for such-and-such
key.
My question is, what happens with the aggregate (and its key) when
events for this key stop coming?
My keys are browser session keys, and are virtually limitless.

Ideally, I'd like to send some sort of purge event on keys a couple of
days later, where I empty the aggregate in the fold. That still leaves
the key though, where does that go?

Any answers highly appreciated...

Greetings

Bart
Reply | Threaded
Open this post in threaded view
|

Re: what happens to the aggregate in a fold on a KeyedStream if the events for a key stop coming ?

Fabian Hueske-2
Hi Bart,

if you run a fold function on a keyed stream without a window, there is no way to remove the key and the folded value.
You will eventually run out of memory if your key space is continuously growing.

If you apply a fold function in a window on a keyed stream you can bound the "lifetime" of the key and value.
Similar as with a non-windowed fold, you can emit a record for each incoming record. Additionally, you can register a timer to purge the window content after a certain time (such as a few days). This blog post should be a good introduction into Flink's window and trigger mechanism [1].

Best, Fabian

[1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html


2016-03-18 11:42 GMT+01:00 Bart van Deenen <[hidden email]>:
If I do a fold on a KeyedStream, I aggregate events for such-and-such
key.
My question is, what happens with the aggregate (and its key) when
events for this key stop coming?
My keys are browser session keys, and are virtually limitless.

Ideally, I'd like to send some sort of purge event on keys a couple of
days later, where I empty the aggregate in the fold. That still leaves
the key though, where does that go?

Any answers highly appreciated...

Greetings

Bart

Reply | Threaded
Open this post in threaded view
|

Re: what happens to the aggregate in a fold on a KeyedStream if the events for a key stop coming ?

Bart van Deenen
Hi Fabian
 
So you're saying that with a windowed stream I can still emit a folded aggregate for each event as it comes in? I didn't realize that, I thought that windows was a sort of micro batching.
I'll go read the link you posted
 
Thanks
 
--
  Bart van Deenen
 
 
 
On Fri, Mar 18, 2016, at 11:54, Fabian Hueske wrote:
Hi Bart,
if you run a fold function on a keyed stream without a window, there is no way to remove the key and the folded value.
You will eventually run out of memory if your key space is continuously growing.
 
If you apply a fold function in a window on a keyed stream you can bound the "lifetime" of the key and value.
Similar as with a non-windowed fold, you can emit a record for each incoming record. Additionally, you can register a timer to purge the window content after a certain time (such as a few days). This blog post should be a good introduction into Flink's window and trigger mechanism [1].
 
Best, Fabian
 
 
 
2016-03-18 11:42 GMT+01:00 Bart van Deenen <[hidden email]>:
If I do a fold on a KeyedStream, I aggregate events for such-and-such
key.
My question is, what happens with the aggregate (and its key) when
events for this key stop coming?
My keys are browser session keys, and are virtually limitless.
 
Ideally, I'd like to send some sort of purge event on keys a couple of
days later, where I empty the aggregate in the fold. That still leaves
the key though, where does that go?
 
Any answers highly appreciated...
 
Greetings

Bart
 
Reply | Threaded
Open this post in threaded view
|

Re: what happens to the aggregate in a fold on a KeyedStream if the events for a key stop coming ?

Fabian Hueske-2
Yes, that's possible.

You have to implement a custom trigger for that. The Trigger.onElement() method will be called for each incoming event. If you return TriggerResult.FIRE, it will call the WindowFunction. You can register a timer which will call the Trigger.onXTime() method once time is up and you can return TriggerResult.PURGE to clear the window.

This other blog post shows how to define a custom trigger [1].

Best, Fabian

[1] https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink


2016-03-18 12:02 GMT+01:00 Bart van Deenen <[hidden email]>:
Hi Fabian
 
So you're saying that with a windowed stream I can still emit a folded aggregate for each event as it comes in? I didn't realize that, I thought that windows was a sort of micro batching.
I'll go read the link you posted
 
Thanks
 
--
  Bart van Deenen
 
 
 
On Fri, Mar 18, 2016, at 11:54, Fabian Hueske wrote:
Hi Bart,
if you run a fold function on a keyed stream without a window, there is no way to remove the key and the folded value.
You will eventually run out of memory if your key space is continuously growing.
 
If you apply a fold function in a window on a keyed stream you can bound the "lifetime" of the key and value.
Similar as with a non-windowed fold, you can emit a record for each incoming record. Additionally, you can register a timer to purge the window content after a certain time (such as a few days). This blog post should be a good introduction into Flink's window and trigger mechanism [1].
 
Best, Fabian
 
 
 
2016-03-18 11:42 GMT+01:00 Bart van Deenen <[hidden email]>:
If I do a fold on a KeyedStream, I aggregate events for such-and-such
key.
My question is, what happens with the aggregate (and its key) when
events for this key stop coming?
My keys are browser session keys, and are virtually limitless.
 
Ideally, I'd like to send some sort of purge event on keys a couple of
days later, where I empty the aggregate in the fold. That still leaves
the key though, where does that go?
 
Any answers highly appreciated...
 
Greetings

Bart
 

Reply | Threaded
Open this post in threaded view
|

Re: what happens to the aggregate in a fold on a KeyedStream if the events for a key stop coming ?

Bart van Deenen
Hi Fabian
 
I'm starting to get it :-)
Do you think it's feasible to have one 24 hour window per key (with keys say a million at the same time)? So I mean, is a window a heavy thing?
Because I really like the idea of having my aggregation run as the event comes in.. It just feels more natural than some sort of micro batching.
 
Thanks
 
Bart
 
--
  Bart van Deenen
 
 
 
On Fri, Mar 18, 2016, at 12:16, Fabian Hueske wrote:
Yes, that's possible.
 
You have to implement a custom trigger for that. The Trigger.onElement() method will be called for each incoming event. If you return TriggerResult.FIRE, it will call the WindowFunction. You can register a timer which will call the Trigger.onXTime() method once time is up and you can return TriggerResult.PURGE to clear the window.
 
This other blog post shows how to define a custom trigger [1].
 
Best, Fabian
 
 
 
2016-03-18 12:02 GMT+01:00 Bart van Deenen <[hidden email]>:

Hi Fabian
 
So you're saying that with a windowed stream I can still emit a folded aggregate for each event as it comes in? I didn't realize that, I thought that windows was a sort of micro batching.
I'll go read the link you posted
 
Thanks
 
 
--
  Bart van Deenen
 
 
 
 
On Fri, Mar 18, 2016, at 11:54, Fabian Hueske wrote:
Hi Bart,
if you run a fold function on a keyed stream without a window, there is no way to remove the key and the folded value.
You will eventually run out of memory if your key space is continuously growing.
 
If you apply a fold function in a window on a keyed stream you can bound the "lifetime" of the key and value.
Similar as with a non-windowed fold, you can emit a record for each incoming record. Additionally, you can register a timer to purge the window content after a certain time (such as a few days). This blog post should be a good introduction into Flink's window and trigger mechanism [1].
 
Best, Fabian
 
 
 
2016-03-18 11:42 GMT+01:00 Bart van Deenen <[hidden email]>:
If I do a fold on a KeyedStream, I aggregate events for such-and-such
key.
My question is, what happens with the aggregate (and its key) when
events for this key stop coming?
My keys are browser session keys, and are virtually limitless.
 
Ideally, I'd like to send some sort of purge event on keys a couple of
days later, where I empty the aggregate in the fold. That still leaves
the key though, where does that go?
 
Any answers highly appreciated...
 
Greetings

Bart
 
 
Reply | Threaded
Open this post in threaded view
|

Re: what happens to the aggregate in a fold on a KeyedStream if the events for a key stop coming ?

Fabian Hueske-2
The "weight" of a window depends on the function that you apply.
If you apply a generic WindowFunction Flink stores all elements that arrived for the window and applies the function if the trigger returns FIRE.
If you apply a FoldFunction (or ReduceFunction), the function is called for each arriving element (regardless of the trigger) and only a single value is stored. This value is emitted whenever a trigger returns FIRE.

So, if you have a 24h window on a keyed stream with a FoldFunction, Flink will hold one value for each window (+a bit of meta data). The number of elements held in memory is independent of the number of elements that arrived for a window (and the time) and depends only on the number of currently active windows (i.e., active keys).

2016-03-18 12:22 GMT+01:00 Bart van Deenen <[hidden email]>:
Hi Fabian
 
I'm starting to get it :-)
Do you think it's feasible to have one 24 hour window per key (with keys say a million at the same time)? So I mean, is a window a heavy thing?
Because I really like the idea of having my aggregation run as the event comes in.. It just feels more natural than some sort of micro batching.
 
Thanks
 
Bart
 
--
  Bart van Deenen
 
 
 
On Fri, Mar 18, 2016, at 12:16, Fabian Hueske wrote:
Yes, that's possible.
 
You have to implement a custom trigger for that. The Trigger.onElement() method will be called for each incoming event. If you return TriggerResult.FIRE, it will call the WindowFunction. You can register a timer which will call the Trigger.onXTime() method once time is up and you can return TriggerResult.PURGE to clear the window.
 
This other blog post shows how to define a custom trigger [1].
 
Best, Fabian
 
 
 
2016-03-18 12:02 GMT+01:00 Bart van Deenen <[hidden email]>:

Hi Fabian
 
So you're saying that with a windowed stream I can still emit a folded aggregate for each event as it comes in? I didn't realize that, I thought that windows was a sort of micro batching.
I'll go read the link you posted
 
Thanks
 
 
--
  Bart van Deenen
 
 
 
 
On Fri, Mar 18, 2016, at 11:54, Fabian Hueske wrote:
Hi Bart,
if you run a fold function on a keyed stream without a window, there is no way to remove the key and the folded value.
You will eventually run out of memory if your key space is continuously growing.
 
If you apply a fold function in a window on a keyed stream you can bound the "lifetime" of the key and value.
Similar as with a non-windowed fold, you can emit a record for each incoming record. Additionally, you can register a timer to purge the window content after a certain time (such as a few days). This blog post should be a good introduction into Flink's window and trigger mechanism [1].
 
Best, Fabian
 
 
 
2016-03-18 11:42 GMT+01:00 Bart van Deenen <[hidden email]>:
If I do a fold on a KeyedStream, I aggregate events for such-and-such
key.
My question is, what happens with the aggregate (and its key) when
events for this key stop coming?
My keys are browser session keys, and are virtually limitless.
 
Ideally, I'd like to send some sort of purge event on keys a couple of
days later, where I empty the aggregate in the fold. That still leaves
the key though, where does that go?
 
Any answers highly appreciated...
 
Greetings

Bart
 
 

Reply | Threaded
Open this post in threaded view
|

Re: what happens to the aggregate in a fold on a KeyedStream if the events for a key stop coming ?

Bart van Deenen
Ok, thanks! I'll do it that way, with a custom trigger and a fold per key.
 
Bart
 
--
  Bart van Deenen
 
 
 
On Fri, Mar 18, 2016, at 13:31, Fabian Hueske wrote:
The "weight" of a window depends on the function that you apply.
If you apply a generic WindowFunction Flink stores all elements that arrived for the window and applies the function if the trigger returns FIRE.
If you apply a FoldFunction (or ReduceFunction), the function is called for each arriving element (regardless of the trigger) and only a single value is stored. This value is emitted whenever a trigger returns FIRE.
So, if you have a 24h window on a keyed stream with a FoldFunction, Flink will hold one value for each window (+a bit of meta data). The number of elements held in memory is independent of the number of elements that arrived for a window (and the time) and depends only on the number of currently active windows (i.e., active keys).
 
2016-03-18 12:22 GMT+01:00 Bart van Deenen <[hidden email]>:

Hi Fabian
 
I'm starting to get it :-)
Do you think it's feasible to have one 24 hour window per key (with keys say a million at the same time)? So I mean, is a window a heavy thing?
Because I really like the idea of having my aggregation run as the event comes in.. It just feels more natural than some sort of micro batching.
 
Thanks
 
 
Bart
 
 
--
  Bart van Deenen
 
 
 
 
On Fri, Mar 18, 2016, at 12:16, Fabian Hueske wrote:
Yes, that's possible.
 
You have to implement a custom trigger for that. The Trigger.onElement() method will be called for each incoming event. If you return TriggerResult.FIRE, it will call the WindowFunction. You can register a timer which will call the Trigger.onXTime() method once time is up and you can return TriggerResult.PURGE to clear the window.
 
This other blog post shows how to define a custom trigger [1].
 
Best, Fabian
 
 
 
2016-03-18 12:02 GMT+01:00 Bart van Deenen <[hidden email]>:

Hi Fabian
 
So you're saying that with a windowed stream I can still emit a folded aggregate for each event as it comes in? I didn't realize that, I thought that windows was a sort of micro batching.
I'll go read the link you posted
 
Thanks
 
 
--
  Bart van Deenen
 
 
 
 
On Fri, Mar 18, 2016, at 11:54, Fabian Hueske wrote:
Hi Bart,
if you run a fold function on a keyed stream without a window, there is no way to remove the key and the folded value.
You will eventually run out of memory if your key space is continuously growing.
 
If you apply a fold function in a window on a keyed stream you can bound the "lifetime" of the key and value.
Similar as with a non-windowed fold, you can emit a record for each incoming record. Additionally, you can register a timer to purge the window content after a certain time (such as a few days). This blog post should be a good introduction into Flink's window and trigger mechanism [1].
 
Best, Fabian
 
 
 
2016-03-18 11:42 GMT+01:00 Bart van Deenen <[hidden email]>:
If I do a fold on a KeyedStream, I aggregate events for such-and-such
key.
My question is, what happens with the aggregate (and its key) when
events for this key stop coming?
My keys are browser session keys, and are virtually limitless.
 
Ideally, I'd like to send some sort of purge event on keys a couple of
days later, where I empty the aggregate in the fold. That still leaves
the key though, where does that go?
 
Any answers highly appreciated...
 
Greetings

Bart