How should I process a cumulative counter?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How should I process a cumulative counter?

Larry Aspen
Hi,

I'm evaluating Flink for our company's IoT use case and read a blog post
by Fabian Hueske from 2015 [1]. We have a similar situation except the
sensor is sending the value of a cumulative counter instead of a count.
We would like to calculate the sum of deltas of consecutive cumulative
counter values that occur during a time window.

Here is a scenario of a cumulative counter measuring runtime in seconds
and a machine starting for the first time at 12:00:00 and running for
the whole hour (sensor records values when it starts, every 15 minutes
and on hour change):

timestamp, cumulative counter value in seconds
12:00:00, 0
12:15:00, 900
12:30:00, 1800
12:45:00, 2700
13:00:00, 3600

This would produce the following deltas:
12:00:00, 900 -0 = 900
12:15:00, 1800 - 900 = 900
12:30:00, 2700 - 1800 = 900
12:45:00, 3600 - 2700 = 900

We would then sum the deltas to get runtime in seconds for the hour:
900 + 900 + 900 + 900 = 3600

What would be a good way to handle this kind of calculation in Flink?

I have already tried using a tumbling event time window of one hour,
but then the last value is only part of the next window and the delta
of 12:45:00 is missing and the sum is 900 + 900 + 900 = 2700.

I have also tried a sliding event time window of two hours where the sum
is calculated for the first hour. This produces the correct sum in this
scenario but fails if the next hour is later (e.g. 12:45:00, 14:00:00
i.e. machine is shutdown between 12:45:00 - 13:00:00 and started at
14:00:00).

My latest attempt has been to use a global window where I try to keep
the values for the last two hours and calculate the sum for the older
hour. This seems to work in my experiments where I read values from
a file and use parallelism of one. If I increase the parallelism, the
values are processed out of order and the results are incorrect as
older values are received after newer values which causes them to be
evicted.

Any advice on this would be appreciated.

Best regards,
Larry Aspen

[1] https://flink.apache.org/news/2015/12/04/Introducing-windows.html
Reply | Threaded
Open this post in threaded view
|

Re: How should I process a cumulative counter?

Aljoscha Krettek
Hi Larry,

the basic problem for your use case is that window boundaries are
inclusive for the start timestamp and exclusive for the end timestamp.  
It's setup like this to ensure that consecutive tumbling windows don't
overlap. This is only a function of how our `WindowAssigner` works, so
it could be done differently in a different system.

Have you tried using a sliding window where the `slide` is `size - 1ms`?  
With this, you would ensure that elements that fall exactly on the
boundary, i.e. your hourly sensor updates would end up in both of the
consecutive windows. It seems a bit unorthodox but could work in your
case.

Best,
Aljoscha

On 2021/01/08 08:56, Larry Aspen wrote:

>Hi,
>
>I'm evaluating Flink for our company's IoT use case and read a blog post
>by Fabian Hueske from 2015 [1]. We have a similar situation except the
>sensor is sending the value of a cumulative counter instead of a count.
>We would like to calculate the sum of deltas of consecutive cumulative
>counter values that occur during a time window.
>
>Here is a scenario of a cumulative counter measuring runtime in seconds
>and a machine starting for the first time at 12:00:00 and running for
>the whole hour (sensor records values when it starts, every 15 minutes
>and on hour change):
>
>timestamp, cumulative counter value in seconds
>12:00:00, 0
>12:15:00, 900
>12:30:00, 1800
>12:45:00, 2700
>13:00:00, 3600
>
>This would produce the following deltas:
>12:00:00, 900 -0 = 900
>12:15:00, 1800 - 900 = 900
>12:30:00, 2700 - 1800 = 900
>12:45:00, 3600 - 2700 = 900
>
>We would then sum the deltas to get runtime in seconds for the hour:
>900 + 900 + 900 + 900 = 3600
>
>What would be a good way to handle this kind of calculation in Flink?
>
>I have already tried using a tumbling event time window of one hour,
>but then the last value is only part of the next window and the delta
>of 12:45:00 is missing and the sum is 900 + 900 + 900 = 2700.
>
>I have also tried a sliding event time window of two hours where the sum
>is calculated for the first hour. This produces the correct sum in this
>scenario but fails if the next hour is later (e.g. 12:45:00, 14:00:00
>i.e. machine is shutdown between 12:45:00 - 13:00:00 and started at
>14:00:00).
>
>My latest attempt has been to use a global window where I try to keep
>the values for the last two hours and calculate the sum for the older
>hour. This seems to work in my experiments where I read values from
>a file and use parallelism of one. If I increase the parallelism, the
>values are processed out of order and the results are incorrect as
>older values are received after newer values which causes them to be
>evicted.
>
>Any advice on this would be appreciated.
>
>Best regards,
>Larry Aspen
>
>[1] https://flink.apache.org/news/2015/12/04/Introducing-windows.html
Reply | Threaded
Open this post in threaded view
|

Re: How should I process a cumulative counter?

Larry Aspen
Hi Aljoscha,

thank you for your reply.

On 2021/01/08 15:44 Aljoscha Krettek wrote:
>the basic problem for your use case is that window boundaries are
>inclusive for the start timestamp and exclusive for the end timestamp.

That's true. What further complicates matters is that the last value of
the window (which should also be the first value of the next window)
might not have exactly the end timestamp of the one hour window but
could be even days in the future if the sensor is powered off, for
example, over a weekend.

>It's setup like this to ensure that consecutive tumbling windows don't
>overlap. This is only a function of how our `WindowAssigner` works, so
>it could be done differently in a different system.

I have tried to learn a little bit about the `WindowAssigner` system.
I think that if I could assign an element to two windows, I could
process the cumulative counter correctly. The issue, that I had with a
`WindowAssigner`, was that I didn't seem to have a way to discover what
other windows exist. I would've wanted to assign the element to a window
based on the element's timestamp and to the previous window. Here is an
illustration of what I mean:

W1 (16:00 - 17:00)              W2 (09:00 - 10:00)
+-----------+                   +-----------+
|   a       |  ...sensor off... |       b   |
+-----------+                   +-----------+

I would like to assign b to windows W2 and W1 so that I can calculate
runtime during W1 as b - a. The value `a` has been recorded when the
sensor started. Because there are no other values, the sensor was
shut down within 15 minutes. The value `b` has been recorded the
following day when the sensor was started the next time. By calculating
b - a I can find out for how many seconds the sensor was running during
window 1 (result would be between 0 and 15 minutes or 0 and 900
seconds).

>Have you tried using a sliding window where the `slide` is `size - 1ms`?
>With this, you would ensure that elements that fall exactly on the
>boundary, i.e. your hourly sensor updates would end up in both of the
>consecutive windows. It seems a bit unorthodox but could work in your
>case.

I've only tried a sliding window with a size of two hours and a `slide`
of one hour. My idea was to keep the window start and end aligned with
full hours. If I were to use a `slide` of `size - 1ms` wouldn't that
cause a widening misalignment with full hours?

My issue with a sliding window is that I can't know how far in the
future the value `b` is. Therefore I can't define a window size that is
long enough to include both values `a` and `b`. Here is an illustration
of that situation:

W1 (16:00 - 18:00)              W2 (09:00 - 10:00)
+-----------------------+       +------------
|   a       |           |  ...  |       b   | ...
+-----------------------+       +------------

Here the window size is two hours and the `slide` is one hour. I would
calculate the runtime for the first half of the window (16:00 - 17:00).
Here the problem is that the value `b` is so far in the future that
it isn't assigned to the window W1 and I can't calculate the runtime for
the hour 16:00 - 17:00.

Best regards,
Larry
Reply | Threaded
Open this post in threaded view
|

Re: How should I process a cumulative counter?

Aljoscha Krettek
Hi Larry,

By now, it seems to me that the windowing API might not be the right
solution for your use case. The fact that sensors can shut down
arbitrarily makes it hard to calculate what window an event should fall
into.

Have you tried looking into `ProcessFunction`? With this you can keep
state and set timers to fire, both based on event time and processing
time. You could store your sensor data and calculate results and evict
events on timer firings.

Best,
Aljoscha