Reduce one event under multiple keys

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Reduce one event under multiple keys

Stephen Connolly
Ok, I'll try and map my problem into something that should be familiar to most people.

Consider collection of PCs, each of which has a unique ID, e.g. ca:fe:ba:be, de:ad:be:ef, etc.

Each PC has a tree of local files. Some of the file paths are coincidentally the same names, but there is no file sharing between PCs.

I need to produce metrics about how often files are opened and how long they are open for.

I need for every X minute tumbling window not just the cumulative averages for each PC, but the averages for each file as well as the cumulative averegaes for each folder and their sub-folders.

I have a stream of events like

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin guide.txt","duration":"196"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
{"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}

So from that I would like to know stuff like:

ca:fe:ba:be had 4/X opens per minute in the X minute window
ca:fe:ba:be had 3/X closes per minute in the X minute window and the average time open was (67+97+197)/3=120... there is no guarantee that the closes will be matched with opens in the same window, which is why I'm only tracking them separately
de:ad:be:ef had 2/X opens per minute in the X minute window
ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the average time open was 120
de:ad:be:ef /foo had 1/X opens per minute in the X minute window
de:ad:be:ef /bar had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute window
etc

What I think I want to do is turn each event into a series of events with different keys, so that

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

gets sent under the keys:

("ca:fe:ba:be","/")
("ca:fe:ba:be","/foo")
("ca:fe:ba:be","/foo/bar")
("ca:fe:ba:be","/foo/bar/README.txt")

Then I could use a window aggregation function to just:

* count the "open" events
* count the "close" events and sum their duration

Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key.

From what I can see, to achieve what I am trying to do, I could use a flatMap followed by a keyBy

In other words I take the events and flat map them based on the path split on '/' returning a Tuple of the (to be) key and the event. Then I can use keyBy to key based on the Tuple 0.

My ask:

Is the above design a good design? How would you achieve the end game better? Do I need to worry about many paths that are accessed rarely and would have an accumulator function that stays at 0 unless there are events in that window... or are the accumulators for each distinct key eagerly purged after each fire trigger.

What gotcha's do I need to look for.

Thanks in advance and appologies for the length

-stephenc
Reply | Threaded
Open this post in threaded view
|

Re: Reduce one event under multiple keys

Chesnay Schepler
This sounds reasonable to me.

I'm a bit confused by this question: "Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key."

Are you asking whether a key that was received in window X (as part of an event) is still present in window x+1? If so, then the answer is no; a key will only be present in a given window if an event was received that fits into that window.

On 08.02.2019 13:21, Stephen Connolly wrote:
Ok, I'll try and map my problem into something that should be familiar to most people.

Consider collection of PCs, each of which has a unique ID, e.g. ca:fe:ba:be, de:ad:be:ef, etc.

Each PC has a tree of local files. Some of the file paths are coincidentally the same names, but there is no file sharing between PCs.

I need to produce metrics about how often files are opened and how long they are open for.

I need for every X minute tumbling window not just the cumulative averages for each PC, but the averages for each file as well as the cumulative averegaes for each folder and their sub-folders.

I have a stream of events like

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin guide.txt","duration":"196"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
{"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}

So from that I would like to know stuff like:

ca:fe:ba:be had 4/X opens per minute in the X minute window
ca:fe:ba:be had 3/X closes per minute in the X minute window and the average time open was (67+97+197)/3=120... there is no guarantee that the closes will be matched with opens in the same window, which is why I'm only tracking them separately
de:ad:be:ef had 2/X opens per minute in the X minute window
ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the average time open was 120
de:ad:be:ef /foo had 1/X opens per minute in the X minute window
de:ad:be:ef /bar had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute window
etc

What I think I want to do is turn each event into a series of events with different keys, so that

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

gets sent under the keys:

("ca:fe:ba:be","/")
("ca:fe:ba:be","/foo")
("ca:fe:ba:be","/foo/bar")
("ca:fe:ba:be","/foo/bar/README.txt")

Then I could use a window aggregation function to just:

* count the "open" events
* count the "close" events and sum their duration

Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key.

From what I can see, to achieve what I am trying to do, I could use a flatMap followed by a keyBy

In other words I take the events and flat map them based on the path split on '/' returning a Tuple of the (to be) key and the event. Then I can use keyBy to key based on the Tuple 0.

My ask:

Is the above design a good design? How would you achieve the end game better? Do I need to worry about many paths that are accessed rarely and would have an accumulator function that stays at 0 unless there are events in that window... or are the accumulators for each distinct key eagerly purged after each fire trigger.

What gotcha's do I need to look for.

Thanks in advance and appologies for the length

-stephenc


Reply | Threaded
Open this post in threaded view
|

Re: Reduce one event under multiple keys

Stephen Connolly


On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <[hidden email]> wrote:
This sounds reasonable to me.

I'm a bit confused by this question: "Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key."

Are you asking whether a key that was received in window X (as part of an event) is still present in window x+1? If so, then the answer is no; a key will only be present in a given window if an event was received that fits into that window.

To confirm:

So let's say I'l tracking the average time a file is opened in folders.

In window N we get the events:

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}

So there will be aggregates stored for ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"), ("ca:fe:ba:be","/foo/bar/README.txt"), etc

In window N+1 we do not get any events at all.

So the memory used by my aggregation functions from window N will be freed and the storage will be effectively zero (modulo any follow on processing that might be on a longer window)

This seems to be what you are saying... in which case my naïeve hope was not so naïve! w00t!
 

On 08.02.2019 13:21, Stephen Connolly wrote:
Ok, I'll try and map my problem into something that should be familiar to most people.

Consider collection of PCs, each of which has a unique ID, e.g. ca:fe:ba:be, de:ad:be:ef, etc.

Each PC has a tree of local files. Some of the file paths are coincidentally the same names, but there is no file sharing between PCs.

I need to produce metrics about how often files are opened and how long they are open for.

I need for every X minute tumbling window not just the cumulative averages for each PC, but the averages for each file as well as the cumulative averegaes for each folder and their sub-folders.

I have a stream of events like

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin guide.txt","duration":"196"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
{"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}

So from that I would like to know stuff like:

ca:fe:ba:be had 4/X opens per minute in the X minute window
ca:fe:ba:be had 3/X closes per minute in the X minute window and the average time open was (67+97+197)/3=120... there is no guarantee that the closes will be matched with opens in the same window, which is why I'm only tracking them separately
de:ad:be:ef had 2/X opens per minute in the X minute window
ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the average time open was 120
de:ad:be:ef /foo had 1/X opens per minute in the X minute window
de:ad:be:ef /bar had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute window
etc

What I think I want to do is turn each event into a series of events with different keys, so that

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

gets sent under the keys:

("ca:fe:ba:be","/")
("ca:fe:ba:be","/foo")
("ca:fe:ba:be","/foo/bar")
("ca:fe:ba:be","/foo/bar/README.txt")

Then I could use a window aggregation function to just:

* count the "open" events
* count the "close" events and sum their duration

Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key.

From what I can see, to achieve what I am trying to do, I could use a flatMap followed by a keyBy

In other words I take the events and flat map them based on the path split on '/' returning a Tuple of the (to be) key and the event. Then I can use keyBy to key based on the Tuple 0.

My ask:

Is the above design a good design? How would you achieve the end game better? Do I need to worry about many paths that are accessed rarely and would have an accumulator function that stays at 0 unless there are events in that window... or are the accumulators for each distinct key eagerly purged after each fire trigger.

What gotcha's do I need to look for.

Thanks in advance and appologies for the length

-stephenc


Reply | Threaded
Open this post in threaded view
|

Re: Reduce one event under multiple keys

Fabian Hueske-2
Hi Stephen,

A window is created with the first record that is assigned to it.
If the windows are based on time and a key, than no window will be created (and not space be occupied) if there is not a first record for a key and time interval.

Anyway, if tracking the number of open files & average opening time is your use case, you might want to implement the logic with a ProcessFunction instead of a window.
The reason is that it is that time windows don't share state, i.e., the information about an opened but not yet closed file would not be "carried over" to the next window.
However, if you use a ProcessFunction, you are responsible for cleaning up the state.

Hope this helps,
Fabian

Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <[hidden email]>:


On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <[hidden email]> wrote:
This sounds reasonable to me.

I'm a bit confused by this question: "Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key."

Are you asking whether a key that was received in window X (as part of an event) is still present in window x+1? If so, then the answer is no; a key will only be present in a given window if an event was received that fits into that window.

To confirm:

So let's say I'l tracking the average time a file is opened in folders.

In window N we get the events:

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}

So there will be aggregates stored for ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"), ("ca:fe:ba:be","/foo/bar/README.txt"), etc

In window N+1 we do not get any events at all.

So the memory used by my aggregation functions from window N will be freed and the storage will be effectively zero (modulo any follow on processing that might be on a longer window)

This seems to be what you are saying... in which case my naïeve hope was not so naïve! w00t!
 

On 08.02.2019 13:21, Stephen Connolly wrote:
Ok, I'll try and map my problem into something that should be familiar to most people.

Consider collection of PCs, each of which has a unique ID, e.g. ca:fe:ba:be, de:ad:be:ef, etc.

Each PC has a tree of local files. Some of the file paths are coincidentally the same names, but there is no file sharing between PCs.

I need to produce metrics about how often files are opened and how long they are open for.

I need for every X minute tumbling window not just the cumulative averages for each PC, but the averages for each file as well as the cumulative averegaes for each folder and their sub-folders.

I have a stream of events like

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin guide.txt","duration":"196"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
{"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}

So from that I would like to know stuff like:

ca:fe:ba:be had 4/X opens per minute in the X minute window
ca:fe:ba:be had 3/X closes per minute in the X minute window and the average time open was (67+97+197)/3=120... there is no guarantee that the closes will be matched with opens in the same window, which is why I'm only tracking them separately
de:ad:be:ef had 2/X opens per minute in the X minute window
ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the average time open was 120
de:ad:be:ef /foo had 1/X opens per minute in the X minute window
de:ad:be:ef /bar had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute window
etc

What I think I want to do is turn each event into a series of events with different keys, so that

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

gets sent under the keys:

("ca:fe:ba:be","/")
("ca:fe:ba:be","/foo")
("ca:fe:ba:be","/foo/bar")
("ca:fe:ba:be","/foo/bar/README.txt")

Then I could use a window aggregation function to just:

* count the "open" events
* count the "close" events and sum their duration

Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key.

From what I can see, to achieve what I am trying to do, I could use a flatMap followed by a keyBy

In other words I take the events and flat map them based on the path split on '/' returning a Tuple of the (to be) key and the event. Then I can use keyBy to key based on the Tuple 0.

My ask:

Is the above design a good design? How would you achieve the end game better? Do I need to worry about many paths that are accessed rarely and would have an accumulator function that stays at 0 unless there are events in that window... or are the accumulators for each distinct key eagerly purged after each fire trigger.

What gotcha's do I need to look for.

Thanks in advance and appologies for the length

-stephenc


Reply | Threaded
Open this post in threaded view
|

Re: Reduce one event under multiple keys

Stephen Connolly


On Mon, 11 Feb 2019 at 09:42, Fabian Hueske <[hidden email]> wrote:
Hi Stephen,

A window is created with the first record that is assigned to it.
If the windows are based on time and a key, than no window will be created (and not space be occupied) if there is not a first record for a key and time interval.

Anyway, if tracking the number of open files & average opening time is your use case, you might want to implement the logic with a ProcessFunction instead of a window.
The reason is that it is that time windows don't share state, i.e., the information about an opened but not yet closed file would not be "carried over" to the next window.
However, if you use a ProcessFunction, you are responsible for cleaning up the state.

Ahh but I am cheating by ensuring the events are rich enough that I do not need to match them.

I get the "open" (they are not really "open" events - I have mapped to an analogy... it might be more like a build job start events... or not... I'm not at liberty to say ;-) ) events because I need to count the number of "open"s per time period.

I get the "close" events and they include the duration plus other information that can then be transformed into the required metrics... yes I could derive the "open" from the "close" by subtracting the duration but:

1. they would cross window boundaries quite often, leading to repeated fetch-update-write operations on the backing data store
2. they wouldn't be as "live" and one of the things we need to know is how many "open"s there are in the previous window... given some durations can be many days, waiting for the "close" event to create the "open" metric would not be a good plan.

Basically, I am pushing some of the calculations to the edge where there is state that makes those calculations cheap and then the rich events are *hopefully* easy to aggregate with just simple aggregation functions that only need to maintain the running total... at least that's what the PoC I am experimenting with Flink should show 
 

Hope this helps,
Fabian

Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <[hidden email]>:


On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <[hidden email]> wrote:
This sounds reasonable to me.

I'm a bit confused by this question: "Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key."

Are you asking whether a key that was received in window X (as part of an event) is still present in window x+1? If so, then the answer is no; a key will only be present in a given window if an event was received that fits into that window.

To confirm:

So let's say I'l tracking the average time a file is opened in folders.

In window N we get the events:

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}

So there will be aggregates stored for ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"), ("ca:fe:ba:be","/foo/bar/README.txt"), etc

In window N+1 we do not get any events at all.

So the memory used by my aggregation functions from window N will be freed and the storage will be effectively zero (modulo any follow on processing that might be on a longer window)

This seems to be what you are saying... in which case my naïeve hope was not so naïve! w00t!
 

On 08.02.2019 13:21, Stephen Connolly wrote:
Ok, I'll try and map my problem into something that should be familiar to most people.

Consider collection of PCs, each of which has a unique ID, e.g. ca:fe:ba:be, de:ad:be:ef, etc.

Each PC has a tree of local files. Some of the file paths are coincidentally the same names, but there is no file sharing between PCs.

I need to produce metrics about how often files are opened and how long they are open for.

I need for every X minute tumbling window not just the cumulative averages for each PC, but the averages for each file as well as the cumulative averegaes for each folder and their sub-folders.

I have a stream of events like

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin guide.txt","duration":"196"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
{"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}

So from that I would like to know stuff like:

ca:fe:ba:be had 4/X opens per minute in the X minute window
ca:fe:ba:be had 3/X closes per minute in the X minute window and the average time open was (67+97+197)/3=120... there is no guarantee that the closes will be matched with opens in the same window, which is why I'm only tracking them separately
de:ad:be:ef had 2/X opens per minute in the X minute window
ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the average time open was 120
de:ad:be:ef /foo had 1/X opens per minute in the X minute window
de:ad:be:ef /bar had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute window
etc

What I think I want to do is turn each event into a series of events with different keys, so that

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

gets sent under the keys:

("ca:fe:ba:be","/")
("ca:fe:ba:be","/foo")
("ca:fe:ba:be","/foo/bar")
("ca:fe:ba:be","/foo/bar/README.txt")

Then I could use a window aggregation function to just:

* count the "open" events
* count the "close" events and sum their duration

Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key.

From what I can see, to achieve what I am trying to do, I could use a flatMap followed by a keyBy

In other words I take the events and flat map them based on the path split on '/' returning a Tuple of the (to be) key and the event. Then I can use keyBy to key based on the Tuple 0.

My ask:

Is the above design a good design? How would you achieve the end game better? Do I need to worry about many paths that are accessed rarely and would have an accumulator function that stays at 0 unless there are events in that window... or are the accumulators for each distinct key eagerly purged after each fire trigger.

What gotcha's do I need to look for.

Thanks in advance and appologies for the length

-stephenc


Reply | Threaded
Open this post in threaded view
|

Re: Reduce one event under multiple keys

Fabian Hueske-2
Hi Stephen,

Sorry for the late response.
If you don't need to match open and close events, your approach of using a flatMap to fan-out for the hierarchical folder structure and a window operator (or two for open and close) for counting and aggregating should be a good design.

Best, Fabian

Am Mo., 11. Feb. 2019 um 11:29 Uhr schrieb Stephen Connolly <[hidden email]>:


On Mon, 11 Feb 2019 at 09:42, Fabian Hueske <[hidden email]> wrote:
Hi Stephen,

A window is created with the first record that is assigned to it.
If the windows are based on time and a key, than no window will be created (and not space be occupied) if there is not a first record for a key and time interval.

Anyway, if tracking the number of open files & average opening time is your use case, you might want to implement the logic with a ProcessFunction instead of a window.
The reason is that it is that time windows don't share state, i.e., the information about an opened but not yet closed file would not be "carried over" to the next window.
However, if you use a ProcessFunction, you are responsible for cleaning up the state.

Ahh but I am cheating by ensuring the events are rich enough that I do not need to match them.

I get the "open" (they are not really "open" events - I have mapped to an analogy... it might be more like a build job start events... or not... I'm not at liberty to say ;-) ) events because I need to count the number of "open"s per time period.

I get the "close" events and they include the duration plus other information that can then be transformed into the required metrics... yes I could derive the "open" from the "close" by subtracting the duration but:

1. they would cross window boundaries quite often, leading to repeated fetch-update-write operations on the backing data store
2. they wouldn't be as "live" and one of the things we need to know is how many "open"s there are in the previous window... given some durations can be many days, waiting for the "close" event to create the "open" metric would not be a good plan.

Basically, I am pushing some of the calculations to the edge where there is state that makes those calculations cheap and then the rich events are *hopefully* easy to aggregate with just simple aggregation functions that only need to maintain the running total... at least that's what the PoC I am experimenting with Flink should show 
 

Hope this helps,
Fabian

Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <[hidden email]>:


On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <[hidden email]> wrote:
This sounds reasonable to me.

I'm a bit confused by this question: "Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key."

Are you asking whether a key that was received in window X (as part of an event) is still present in window x+1? If so, then the answer is no; a key will only be present in a given window if an event was received that fits into that window.

To confirm:

So let's say I'l tracking the average time a file is opened in folders.

In window N we get the events:

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}

So there will be aggregates stored for ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"), ("ca:fe:ba:be","/foo/bar/README.txt"), etc

In window N+1 we do not get any events at all.

So the memory used by my aggregation functions from window N will be freed and the storage will be effectively zero (modulo any follow on processing that might be on a longer window)

This seems to be what you are saying... in which case my naïeve hope was not so naïve! w00t!
 

On 08.02.2019 13:21, Stephen Connolly wrote:
Ok, I'll try and map my problem into something that should be familiar to most people.

Consider collection of PCs, each of which has a unique ID, e.g. ca:fe:ba:be, de:ad:be:ef, etc.

Each PC has a tree of local files. Some of the file paths are coincidentally the same names, but there is no file sharing between PCs.

I need to produce metrics about how often files are opened and how long they are open for.

I need for every X minute tumbling window not just the cumulative averages for each PC, but the averages for each file as well as the cumulative averegaes for each folder and their sub-folders.

I have a stream of events like

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin guide.txt","duration":"196"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
{"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}

So from that I would like to know stuff like:

ca:fe:ba:be had 4/X opens per minute in the X minute window
ca:fe:ba:be had 3/X closes per minute in the X minute window and the average time open was (67+97+197)/3=120... there is no guarantee that the closes will be matched with opens in the same window, which is why I'm only tracking them separately
de:ad:be:ef had 2/X opens per minute in the X minute window
ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the average time open was 120
de:ad:be:ef /foo had 1/X opens per minute in the X minute window
de:ad:be:ef /bar had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute window
etc

What I think I want to do is turn each event into a series of events with different keys, so that

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

gets sent under the keys:

("ca:fe:ba:be","/")
("ca:fe:ba:be","/foo")
("ca:fe:ba:be","/foo/bar")
("ca:fe:ba:be","/foo/bar/README.txt")

Then I could use a window aggregation function to just:

* count the "open" events
* count the "close" events and sum their duration

Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key.

From what I can see, to achieve what I am trying to do, I could use a flatMap followed by a keyBy

In other words I take the events and flat map them based on the path split on '/' returning a Tuple of the (to be) key and the event. Then I can use keyBy to key based on the Tuple 0.

My ask:

Is the above design a good design? How would you achieve the end game better? Do I need to worry about many paths that are accessed rarely and would have an accumulator function that stays at 0 unless there are events in that window... or are the accumulators for each distinct key eagerly purged after each fire trigger.

What gotcha's do I need to look for.

Thanks in advance and appologies for the length

-stephenc


Reply | Threaded
Open this post in threaded view
|

Re: Reduce one event under multiple keys

Stephen Connolly
Thanks! 

On Mon, 18 Feb 2019 at 12:36, Fabian Hueske <[hidden email]> wrote:
Hi Stephen,

Sorry for the late response.
If you don't need to match open and close events, your approach of using a flatMap to fan-out for the hierarchical folder structure and a window operator (or two for open and close) for counting and aggregating should be a good design.

Best, Fabian

Am Mo., 11. Feb. 2019 um 11:29 Uhr schrieb Stephen Connolly <[hidden email]>:


On Mon, 11 Feb 2019 at 09:42, Fabian Hueske <[hidden email]> wrote:
Hi Stephen,

A window is created with the first record that is assigned to it.
If the windows are based on time and a key, than no window will be created (and not space be occupied) if there is not a first record for a key and time interval.

Anyway, if tracking the number of open files & average opening time is your use case, you might want to implement the logic with a ProcessFunction instead of a window.
The reason is that it is that time windows don't share state, i.e., the information about an opened but not yet closed file would not be "carried over" to the next window.
However, if you use a ProcessFunction, you are responsible for cleaning up the state.

Ahh but I am cheating by ensuring the events are rich enough that I do not need to match them.

I get the "open" (they are not really "open" events - I have mapped to an analogy... it might be more like a build job start events... or not... I'm not at liberty to say ;-) ) events because I need to count the number of "open"s per time period.

I get the "close" events and they include the duration plus other information that can then be transformed into the required metrics... yes I could derive the "open" from the "close" by subtracting the duration but:

1. they would cross window boundaries quite often, leading to repeated fetch-update-write operations on the backing data store
2. they wouldn't be as "live" and one of the things we need to know is how many "open"s there are in the previous window... given some durations can be many days, waiting for the "close" event to create the "open" metric would not be a good plan.

Basically, I am pushing some of the calculations to the edge where there is state that makes those calculations cheap and then the rich events are *hopefully* easy to aggregate with just simple aggregation functions that only need to maintain the running total... at least that's what the PoC I am experimenting with Flink should show 
 

Hope this helps,
Fabian

Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <[hidden email]>:


On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <[hidden email]> wrote:
This sounds reasonable to me.

I'm a bit confused by this question: "Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key."

Are you asking whether a key that was received in window X (as part of an event) is still present in window x+1? If so, then the answer is no; a key will only be present in a given window if an event was received that fits into that window.

To confirm:

So let's say I'l tracking the average time a file is opened in folders.

In window N we get the events:

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}

So there will be aggregates stored for ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"), ("ca:fe:ba:be","/foo/bar/README.txt"), etc

In window N+1 we do not get any events at all.

So the memory used by my aggregation functions from window N will be freed and the storage will be effectively zero (modulo any follow on processing that might be on a longer window)

This seems to be what you are saying... in which case my naïeve hope was not so naïve! w00t!
 

On 08.02.2019 13:21, Stephen Connolly wrote:
Ok, I'll try and map my problem into something that should be familiar to most people.

Consider collection of PCs, each of which has a unique ID, e.g. ca:fe:ba:be, de:ad:be:ef, etc.

Each PC has a tree of local files. Some of the file paths are coincidentally the same names, but there is no file sharing between PCs.

I need to produce metrics about how often files are opened and how long they are open for.

I need for every X minute tumbling window not just the cumulative averages for each PC, but the averages for each file as well as the cumulative averegaes for each folder and their sub-folders.

I have a stream of events like

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin guide.txt","duration":"196"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
{"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}

So from that I would like to know stuff like:

ca:fe:ba:be had 4/X opens per minute in the X minute window
ca:fe:ba:be had 3/X closes per minute in the X minute window and the average time open was (67+97+197)/3=120... there is no guarantee that the closes will be matched with opens in the same window, which is why I'm only tracking them separately
de:ad:be:ef had 2/X opens per minute in the X minute window
ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the average time open was 120
de:ad:be:ef /foo had 1/X opens per minute in the X minute window
de:ad:be:ef /bar had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute window
etc

What I think I want to do is turn each event into a series of events with different keys, so that

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

gets sent under the keys:

("ca:fe:ba:be","/")
("ca:fe:ba:be","/foo")
("ca:fe:ba:be","/foo/bar")
("ca:fe:ba:be","/foo/bar/README.txt")

Then I could use a window aggregation function to just:

* count the "open" events
* count the "close" events and sum their duration

Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key.

From what I can see, to achieve what I am trying to do, I could use a flatMap followed by a keyBy

In other words I take the events and flat map them based on the path split on '/' returning a Tuple of the (to be) key and the event. Then I can use keyBy to key based on the Tuple 0.

My ask:

Is the above design a good design? How would you achieve the end game better? Do I need to worry about many paths that are accessed rarely and would have an accumulator function that stays at 0 unless there are events in that window... or are the accumulators for each distinct key eagerly purged after each fire trigger.

What gotcha's do I need to look for.

Thanks in advance and appologies for the length

-stephenc