Watermark for each key?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Watermark for each key?

Lasse Nedergaard
Hi.

We work with IoT data and we have cases where the IoT-device delay data transfer if it can't get network access. We would like to use table windows aggregate function over each device to calculate some statistics, but for windows aggregate functions to work we need to assign a watermark. This watermark is general for all devices. We can set allow latency, but we can't set it to months. 
So what we need is to have a watermark for each device (key by) so the window aggregate work on the timestamp delivered for the device and not the global watermark. 
Is that possible, or have anyone consider this feature?

Best 

Lasse Nedergaard
 
Reply | Threaded
Open this post in threaded view
|

Re: Watermark for each key?

Till Rohrmann
Hi Lasse,

at the moment this is not supported out of the box by Flink. The community thought about this feature but so far did not implement it. Unfortunately, I'm also not aware of an easy workaround one could do in the user code space.

Cheers,
Till

On Wed, Apr 24, 2019 at 3:26 PM Lasse Nedergaard <[hidden email]> wrote:
Hi.

We work with IoT data and we have cases where the IoT-device delay data transfer if it can't get network access. We would like to use table windows aggregate function over each device to calculate some statistics, but for windows aggregate functions to work we need to assign a watermark. This watermark is general for all devices. We can set allow latency, but we can't set it to months. 
So what we need is to have a watermark for each device (key by) so the window aggregate work on the timestamp delivered for the device and not the global watermark. 
Is that possible, or have anyone consider this feature?

Best 

Lasse Nedergaard
 
Reply | Threaded
Open this post in threaded view
|

Re: Watermark for each key?

Lasse Nedergaard
Thanks Till

What about this workaround. 
If I after the watermark assignment split the stream in elements that fits in the watermark (s1) and those that don’t (s2). The s1 I process with the table api with a window aggregate using watermark and s2 I handle with an unbounded non-windows aggregate with IdleStateRentionTime so state are removed when my devices are up to date again. I then merge the two outputs and continue. 
By doing this I handle 99% as standard and only keeping state for the late data. 

Make sense? And would it work?

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 24. apr. 2019 kl. 19.00 skrev Till Rohrmann <[hidden email]>:

Hi Lasse,

at the moment this is not supported out of the box by Flink. The community thought about this feature but so far did not implement it. Unfortunately, I'm also not aware of an easy workaround one could do in the user code space.

Cheers,
Till

On Wed, Apr 24, 2019 at 3:26 PM Lasse Nedergaard <[hidden email]> wrote:
Hi.

We work with IoT data and we have cases where the IoT-device delay data transfer if it can't get network access. We would like to use table windows aggregate function over each device to calculate some statistics, but for windows aggregate functions to work we need to assign a watermark. This watermark is general for all devices. We can set allow latency, but we can't set it to months. 
So what we need is to have a watermark for each device (key by) so the window aggregate work on the timestamp delivered for the device and not the global watermark. 
Is that possible, or have anyone consider this feature?

Best 

Lasse Nedergaard
 
Reply | Threaded
Open this post in threaded view
|

Re: Watermark for each key?

Congxian Qiu
There was someone working in IoT asking me whether Flink supports per-key watermark also.

I’m not sure if we can do the statistics by using raw state manipulating. We create a single state for every single key, and when receiving a key, we extract the timestamp and to see if we need to send some result to the downside(like the trigger action in window), and we can also have tolerant the come delay data.


Best, Congxian
On Apr 25, 2019, 01:58 +0800, Lasse Nedergaard <[hidden email]>, wrote:
Thanks Till

What about this workaround. 
If I after the watermark assignment split the stream in elements that fits in the watermark (s1) and those that don’t (s2). The s1 I process with the table api with a window aggregate using watermark and s2 I handle with an unbounded non-windows aggregate with IdleStateRentionTime so state are removed when my devices are up to date again. I then merge the two outputs and continue. 
By doing this I handle 99% as standard and only keeping state for the late data. 

Make sense? And would it work?

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 24. apr. 2019 kl. 19.00 skrev Till Rohrmann <[hidden email]>:

Hi Lasse,

at the moment this is not supported out of the box by Flink. The community thought about this feature but so far did not implement it. Unfortunately, I'm also not aware of an easy workaround one could do in the user code space.

Cheers,
Till

On Wed, Apr 24, 2019 at 3:26 PM Lasse Nedergaard <[hidden email]> wrote:
Hi.

We work with IoT data and we have cases where the IoT-device delay data transfer if it can't get network access. We would like to use table windows aggregate function over each device to calculate some statistics, but for windows aggregate functions to work we need to assign a watermark. This watermark is general for all devices. We can set allow latency, but we can't set it to months. 
So what we need is to have a watermark for each device (key by) so the window aggregate work on the timestamp delivered for the device and not the global watermark. 
Is that possible, or have anyone consider this feature?

Best 

Lasse Nedergaard
 
Reply | Threaded
Open this post in threaded view
|

Re: Watermark for each key?

Till Rohrmann
Your proposal could probably also be implemented by using Flink's support for allowed lateness when defining a window [1]. It has basically the same idea that there might be some elements which violate the watermark semantics and which need to be handled separately.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#late-elements

Cheers,
Till

On Thu, Apr 25, 2019 at 10:21 AM Congxian Qiu <[hidden email]> wrote:
There was someone working in IoT asking me whether Flink supports per-key watermark also.

I’m not sure if we can do the statistics by using raw state manipulating. We create a single state for every single key, and when receiving a key, we extract the timestamp and to see if we need to send some result to the downside(like the trigger action in window), and we can also have tolerant the come delay data.


Best, Congxian
On Apr 25, 2019, 01:58 +0800, Lasse Nedergaard <[hidden email]>, wrote:
Thanks Till

What about this workaround. 
If I after the watermark assignment split the stream in elements that fits in the watermark (s1) and those that don’t (s2). The s1 I process with the table api with a window aggregate using watermark and s2 I handle with an unbounded non-windows aggregate with IdleStateRentionTime so state are removed when my devices are up to date again. I then merge the two outputs and continue. 
By doing this I handle 99% as standard and only keeping state for the late data. 

Make sense? And would it work?

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 24. apr. 2019 kl. 19.00 skrev Till Rohrmann <[hidden email]>:

Hi Lasse,

at the moment this is not supported out of the box by Flink. The community thought about this feature but so far did not implement it. Unfortunately, I'm also not aware of an easy workaround one could do in the user code space.

Cheers,
Till

On Wed, Apr 24, 2019 at 3:26 PM Lasse Nedergaard <[hidden email]> wrote:
Hi.

We work with IoT data and we have cases where the IoT-device delay data transfer if it can't get network access. We would like to use table windows aggregate function over each device to calculate some statistics, but for windows aggregate functions to work we need to assign a watermark. This watermark is general for all devices. We can set allow latency, but we can't set it to months. 
So what we need is to have a watermark for each device (key by) so the window aggregate work on the timestamp delivered for the device and not the global watermark. 
Is that possible, or have anyone consider this feature?

Best 

Lasse Nedergaard