Hi,
I'm evaluating Flink for our company's IoT use case and read a blog post by Fabian Hueske from 2015 [1]. We have a similar situation except the sensor is sending the value of a cumulative counter instead of a count. We would like to calculate the sum of deltas of consecutive cumulative counter values that occur during a time window. Here is a scenario of a cumulative counter measuring runtime in seconds and a machine starting for the first time at 12:00:00 and running for the whole hour (sensor records values when it starts, every 15 minutes and on hour change): timestamp, cumulative counter value in seconds 12:00:00, 0 12:15:00, 900 12:30:00, 1800 12:45:00, 2700 13:00:00, 3600 This would produce the following deltas: 12:00:00, 900 -0 = 900 12:15:00, 1800 - 900 = 900 12:30:00, 2700 - 1800 = 900 12:45:00, 3600 - 2700 = 900 We would then sum the deltas to get runtime in seconds for the hour: 900 + 900 + 900 + 900 = 3600 What would be a good way to handle this kind of calculation in Flink? I have already tried using a tumbling event time window of one hour, but then the last value is only part of the next window and the delta of 12:45:00 is missing and the sum is 900 + 900 + 900 = 2700. I have also tried a sliding event time window of two hours where the sum is calculated for the first hour. This produces the correct sum in this scenario but fails if the next hour is later (e.g. 12:45:00, 14:00:00 i.e. machine is shutdown between 12:45:00 - 13:00:00 and started at 14:00:00). My latest attempt has been to use a global window where I try to keep the values for the last two hours and calculate the sum for the older hour. This seems to work in my experiments where I read values from a file and use parallelism of one. If I increase the parallelism, the values are processed out of order and the results are incorrect as older values are received after newer values which causes them to be evicted. Any advice on this would be appreciated. Best regards, Larry Aspen [1] https://flink.apache.org/news/2015/12/04/Introducing-windows.html |
Hi Larry,
the basic problem for your use case is that window boundaries are inclusive for the start timestamp and exclusive for the end timestamp. It's setup like this to ensure that consecutive tumbling windows don't overlap. This is only a function of how our `WindowAssigner` works, so it could be done differently in a different system. Have you tried using a sliding window where the `slide` is `size - 1ms`? With this, you would ensure that elements that fall exactly on the boundary, i.e. your hourly sensor updates would end up in both of the consecutive windows. It seems a bit unorthodox but could work in your case. Best, Aljoscha On 2021/01/08 08:56, Larry Aspen wrote: >Hi, > >I'm evaluating Flink for our company's IoT use case and read a blog post >by Fabian Hueske from 2015 [1]. We have a similar situation except the >sensor is sending the value of a cumulative counter instead of a count. >We would like to calculate the sum of deltas of consecutive cumulative >counter values that occur during a time window. > >Here is a scenario of a cumulative counter measuring runtime in seconds >and a machine starting for the first time at 12:00:00 and running for >the whole hour (sensor records values when it starts, every 15 minutes >and on hour change): > >timestamp, cumulative counter value in seconds >12:00:00, 0 >12:15:00, 900 >12:30:00, 1800 >12:45:00, 2700 >13:00:00, 3600 > >This would produce the following deltas: >12:00:00, 900 -0 = 900 >12:15:00, 1800 - 900 = 900 >12:30:00, 2700 - 1800 = 900 >12:45:00, 3600 - 2700 = 900 > >We would then sum the deltas to get runtime in seconds for the hour: >900 + 900 + 900 + 900 = 3600 > >What would be a good way to handle this kind of calculation in Flink? > >I have already tried using a tumbling event time window of one hour, >but then the last value is only part of the next window and the delta >of 12:45:00 is missing and the sum is 900 + 900 + 900 = 2700. > >I have also tried a sliding event time window of two hours where the sum >is calculated for the first hour. This produces the correct sum in this >scenario but fails if the next hour is later (e.g. 12:45:00, 14:00:00 >i.e. machine is shutdown between 12:45:00 - 13:00:00 and started at >14:00:00). > >My latest attempt has been to use a global window where I try to keep >the values for the last two hours and calculate the sum for the older >hour. This seems to work in my experiments where I read values from >a file and use parallelism of one. If I increase the parallelism, the >values are processed out of order and the results are incorrect as >older values are received after newer values which causes them to be >evicted. > >Any advice on this would be appreciated. > >Best regards, >Larry Aspen > >[1] https://flink.apache.org/news/2015/12/04/Introducing-windows.html |
Hi Aljoscha,
thank you for your reply. On 2021/01/08 15:44 Aljoscha Krettek wrote: >the basic problem for your use case is that window boundaries are >inclusive for the start timestamp and exclusive for the end timestamp. That's true. What further complicates matters is that the last value of the window (which should also be the first value of the next window) might not have exactly the end timestamp of the one hour window but could be even days in the future if the sensor is powered off, for example, over a weekend. >It's setup like this to ensure that consecutive tumbling windows don't >overlap. This is only a function of how our `WindowAssigner` works, so >it could be done differently in a different system. I have tried to learn a little bit about the `WindowAssigner` system. I think that if I could assign an element to two windows, I could process the cumulative counter correctly. The issue, that I had with a `WindowAssigner`, was that I didn't seem to have a way to discover what other windows exist. I would've wanted to assign the element to a window based on the element's timestamp and to the previous window. Here is an illustration of what I mean: W1 (16:00 - 17:00) W2 (09:00 - 10:00) +-----------+ +-----------+ | a | ...sensor off... | b | +-----------+ +-----------+ I would like to assign b to windows W2 and W1 so that I can calculate runtime during W1 as b - a. The value `a` has been recorded when the sensor started. Because there are no other values, the sensor was shut down within 15 minutes. The value `b` has been recorded the following day when the sensor was started the next time. By calculating b - a I can find out for how many seconds the sensor was running during window 1 (result would be between 0 and 15 minutes or 0 and 900 seconds). >Have you tried using a sliding window where the `slide` is `size - 1ms`? >With this, you would ensure that elements that fall exactly on the >boundary, i.e. your hourly sensor updates would end up in both of the >consecutive windows. It seems a bit unorthodox but could work in your >case. I've only tried a sliding window with a size of two hours and a `slide` of one hour. My idea was to keep the window start and end aligned with full hours. If I were to use a `slide` of `size - 1ms` wouldn't that cause a widening misalignment with full hours? My issue with a sliding window is that I can't know how far in the future the value `b` is. Therefore I can't define a window size that is long enough to include both values `a` and `b`. Here is an illustration of that situation: W1 (16:00 - 18:00) W2 (09:00 - 10:00) +-----------------------+ +------------ | a | | ... | b | ... +-----------------------+ +------------ Here the window size is two hours and the `slide` is one hour. I would calculate the runtime for the first half of the window (16:00 - 17:00). Here the problem is that the value `b` is so far in the future that it isn't assigned to the window W1 and I can't calculate the runtime for the hour 16:00 - 17:00. Best regards, Larry |
Hi Larry,
By now, it seems to me that the windowing API might not be the right solution for your use case. The fact that sensors can shut down arbitrarily makes it hard to calculate what window an event should fall into. Have you tried looking into `ProcessFunction`? With this you can keep state and set timers to fire, both based on event time and processing time. You could store your sensor data and calculate results and evict events on timer firings. Best, Aljoscha |
Free forum by Nabble | Edit this page |