Custom Processing per window

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom Processing per window

Dhruv Kumar
Hi

I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)

Assumptions:
1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.
2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 

1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.

2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  

Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.

Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

Reply | Threaded
Open this post in threaded view
|

Re: Custom Processing per window

Jörn Franke
How would you start implementing it? Where are you stuck?

Did you already try to implement this?

On 18. Mar 2018, at 04:10, Dhruv Kumar <[hidden email]> wrote:

Hi

I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)

Assumptions:
1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.
2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 

1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.

2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  

Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.

Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

Reply | Threaded
Open this post in threaded view
|

Re: Custom Processing per window

Dhruv Kumar
Task 1: I implemented it using a custom Trigger (see attached file). Looks like it is doing what I want it to. I copied the code from EventTimeTrigger.java and overwrote the onElement method. 

Task 2: I will need to maintain the state (this will be the LRU cache) for multiple keys in the same data structure. But it looks like that the Keyed states are on a per key basis. Should I use OperatorState in some way? Can I use a data structure not directly managed by Flink? What will happen in the case of keys across multiple machines?




Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

On Mar 19, 2018, at 02:04, Jörn Franke <[hidden email]> wrote:

How would you start implementing it? Where are you stuck?

Did you already try to implement this?

On 18. Mar 2018, at 04:10, Dhruv Kumar <[hidden email]> wrote:

Hi

I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)

Assumptions:
1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.
2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 

1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.

2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  

Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.

Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me



LazyAlgoTrigger.java (5K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Custom Processing per window

Dhruv Kumar
In other words, while using the Flink streaming APIs, is it possible to take a decision on emitting a particular key based on the state of some other key present in the same window?

Thanks!
--------------------------------------------------
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

On Mar 19, 2018, at 05:11, Dhruv Kumar <[hidden email]> wrote:

Task 1: I implemented it using a custom Trigger (see attached file). Looks like it is doing what I want it to. I copied the code from EventTimeTrigger.java and overwrote the onElement method. 

Task 2: I will need to maintain the state (this will be the LRU cache) for multiple keys in the same data structure. But it looks like that the Keyed states are on a per key basis. Should I use OperatorState in some way? Can I use a data structure not directly managed by Flink? What will happen in the case of keys across multiple machines?

<LazyAlgoTrigger.java>


Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

On Mar 19, 2018, at 02:04, Jörn Franke <[hidden email]> wrote:

How would you start implementing it? Where are you stuck?

Did you already try to implement this?

On 18. Mar 2018, at 04:10, Dhruv Kumar <[hidden email]> wrote:

Hi

I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)

Assumptions:
1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.
2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 

1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.

2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  

Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.

Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me



Reply | Threaded
Open this post in threaded view
|

Re: Custom Processing per window

Fabian Hueske-2
Hi,

Data is partitioned by key across machines and state is kept per key. It is not possible to interact with two keys at the same time.

Best, Fabian

2018-03-19 14:47 GMT+01:00 Dhruv Kumar <[hidden email]>:
In other words, while using the Flink streaming APIs, is it possible to take a decision on emitting a particular key based on the state of some other key present in the same window?

Thanks!
--------------------------------------------------
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

On Mar 19, 2018, at 05:11, Dhruv Kumar <[hidden email]> wrote:

Task 1: I implemented it using a custom Trigger (see attached file). Looks like it is doing what I want it to. I copied the code from EventTimeTrigger.java and overwrote the onElement method. 

Task 2: I will need to maintain the state (this will be the LRU cache) for multiple keys in the same data structure. But it looks like that the Keyed states are on a per key basis. Should I use OperatorState in some way? Can I use a data structure not directly managed by Flink? What will happen in the case of keys across multiple machines?

<LazyAlgoTrigger.java>


Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

On Mar 19, 2018, at 02:04, Jörn Franke <[hidden email]> wrote:

How would you start implementing it? Where are you stuck?

Did you already try to implement this?

On 18. Mar 2018, at 04:10, Dhruv Kumar <[hidden email]> wrote:

Hi

I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)

Assumptions:
1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.
2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 

1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.

2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  

Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.

Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me




Reply | Threaded
Open this post in threaded view
|

Re: Custom Processing per window

Dhruv Kumar
Is there a way I can leverage OperatorState (instead of KeyState) to solve my issue?


On Mar 19, 2018, at 09:00, Fabian Hueske <[hidden email]> wrote:

Hi,

Data is partitioned by key across machines and state is kept per key. It is not possible to interact with two keys at the same time.

Best, Fabian

2018-03-19 14:47 GMT+01:00 Dhruv Kumar <[hidden email]>:
In other words, while using the Flink streaming APIs, is it possible to take a decision on emitting a particular key based on the state of some other key present in the same window?

Thanks!
--------------------------------------------------
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

On Mar 19, 2018, at 05:11, Dhruv Kumar <[hidden email]> wrote:

Task 1: I implemented it using a custom Trigger (see attached file). Looks like it is doing what I want it to. I copied the code from EventTimeTrigger.java and overwrote the onElement method. 

Task 2: I will need to maintain the state (this will be the LRU cache) for multiple keys in the same data structure. But it looks like that the Keyed states are on a per key basis. Should I use OperatorState in some way? Can I use a data structure not directly managed by Flink? What will happen in the case of keys across multiple machines?

<LazyAlgoTrigger.java>


Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

On Mar 19, 2018, at 02:04, Jörn Franke <[hidden email]> wrote:

How would you start implementing it? Where are you stuck?

Did you already try to implement this?

On 18. Mar 2018, at 04:10, Dhruv Kumar <[hidden email]> wrote:

Hi

I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)

Assumptions:
1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.
2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 

1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.

2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  

Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.

Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me