My use case primarily concerns applying transformations per key, with the keys remaining fixed throughout the topology. I am using event time for my windows.
The problem i am currently facing is that watermarks in windows propagate per operator instance, meaning the operator event time increases for all keys that the operator is in charge of. I wish for watermarks to progress per key, not per operator instance. Is this easily possible? I was unable to find an appropriate solution based on existing code recipes. Greetings Leon |
Hi, I'm afraid this is impossible with the current design of Flink. Might I ask what you want to achieve with this? Maybe we can come up with a solution. -Aljoscha On Tue, 31 May 2016 at 13:24 <[hidden email]> wrote:
|
Hi Aljoscha,
thanks for the speedy reply. I am processing measurements delivered by smart meters. I use windows to gather measurements and calculate values such as average consumption. The key is simply the meter ID. The challenge is that meters may undergo network partitioning, under which they fall back to local buffering. The data is then transmitted once connectivity has been re-established. I am using event time to obtain accurate calculations. If a specific meter goes offline, and the watermark progresses to the next window for an operator instance, then all late data will be discarded once that meter is online again, until it has caught up to the event time. This is because I am using a custom EventTimeTrigger implementation that discards late elements. The reason for that is because Flink would otherwise immediately evaluate the window upon receiving a late element, which is a problem since my calculations (e.g. the average consumption) depend on multiple elements. I cannot calculate averages with that single late element. Each individual meter guarantees in-order transmission of measurements. If watermarks progressed per key, then i would never have late elements because of that guarantee. I would be able to accurately calculate averages, with the trade-off that my results would arrive sporadically from the same operator instance. I suppose I could bypass the use of windows by implementing a stateful map function that mimics windows to a certain degree. I implemented something similar in Storm, but the amount of application logic required is substantial. I completely understand why Flink evaluates a window on a late element, since there is no other way to know when to evaluate the window as event time has already progressed. Perhaps there is a way to gather/redirect late elements? Regards Leon 31. May 2016 13:37 by [hidden email]:
|
Hi, yeah, in that case per-key watermarks would be useful for you. I won't be possible to add such a feature, though, due to the (possibly) dynamic nature of the key space and how watermark tracking works. You should be able to implement it with relatively low overhead using a RichFlatMapFunction and keyed state. This is the relevant section of the doc: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface. We are also in the process of improving our windowing system, especially when it comes to late data, cleanup and trigger semantics. You can have a look here if you're interested: https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing. Best, Aljoscha On Tue, 31 May 2016 at 14:36 <[hidden email]> wrote:
|
In reply to this post by leon_mclare
Hi again Aljoscha,
understood. Thanks for the link. I really like the straightforward approach concerning storing state. It makes things very easy. The improvements are very interesting, particularly the composite triggers. That would significantly improve flexibility. Kind regards Leon 1. Jun 2016 14:54 by [hidden email]:
|
In reply to this post by Aljoscha Krettek
Hi Aljoscha! Is it possible somehow to use the RichXFunction in CEP?
The task is pretty similar, but I have to ignore once the next triggered event for the same key. On Wed, Jun 1, 2016 at 2:54 PM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > yeah, in that case per-key watermarks would be useful for you. I won't be > possible to add such a feature, though, due to the (possibly) dynamic nature > of the key space and how watermark tracking works. > > You should be able to implement it with relatively low overhead using a > RichFlatMapFunction and keyed state. This is the relevant section of the > doc: > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface. > > We are also in the process of improving our windowing system, especially > when it comes to late data, cleanup and trigger semantics. You can have a > look here if you're interested: > https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing. > > Best, > Aljoscha > > On Tue, 31 May 2016 at 14:36 <[hidden email]> wrote: >> >> Hi Aljoscha, >> >> thanks for the speedy reply. >> >> I am processing measurements delivered by smart meters. I use windows to >> gather measurements and calculate values such as average consumption. The >> key is simply the meter ID. >> >> The challenge is that meters may undergo network partitioning, under which >> they fall back to local buffering. The data is then transmitted once >> connectivity has been re-established. I am using event time to obtain >> accurate calculations. >> >> If a specific meter goes offline, and the watermark progresses to the next >> window for an operator instance, then all late data will be discarded once >> that meter is online again, until it has caught up to the event time. This >> is because I am using a custom EventTimeTrigger implementation that discards >> late elements. The reason for that is because Flink would otherwise >> immediately evaluate the window upon receiving a late element, which is a >> problem since my calculations (e.g. the average consumption) depend on >> multiple elements. I cannot calculate averages with that single late >> element. >> >> Each individual meter guarantees in-order transmission of measurements. If >> watermarks progressed per key, then i would never have late elements because >> of that guarantee. I would be able to accurately calculate averages, with >> the trade-off that my results would arrive sporadically from the same >> operator instance. >> >> I suppose I could bypass the use of windows by implementing a stateful map >> function that mimics windows to a certain degree. I implemented something >> similar in Storm, but the amount of application logic required is >> substantial. >> >> I completely understand why Flink evaluates a window on a late element, >> since there is no other way to know when to evaluate the window as event >> time has already progressed. >> >> Perhaps there is a way to gather/redirect late elements? >> >> Regards >> Leon >> >> 31. May 2016 13:37 by [hidden email]: >> >> >> Hi, >> I'm afraid this is impossible with the current design of Flink. Might I >> ask what you want to achieve with this? Maybe we can come up with a >> solution. >> >> -Aljoscha >> >> On Tue, 31 May 2016 at 13:24 <[hidden email]> wrote: >>> >>> My use case primarily concerns applying transformations per key, with the >>> keys remaining fixed throughout the topology. I am using event time for my >>> windows. >>> >>> The problem i am currently facing is that watermarks in windows propagate >>> per operator instance, meaning the operator event time increases for all >>> keys that the operator is in charge of. I wish for watermarks to progress >>> per key, not per operator instance. >>> >>> Is this easily possible? I was unable to find an appropriate solution >>> based on existing code recipes. >>> >>> Greetings >>> Leon |
Hi,
I think you first have to convert back to a DataStream using .select() or .flatSelect(). But Till should know more about this, maybe he can help. Cheers, Aljoscha On Thu, 2 Jun 2016 at 19:19 Kanstantsin Kamkou <[hidden email]> wrote: Hi Aljoscha! Is it possible somehow to use the RichXFunction in CEP? |
This post was updated on .
In reply to this post by Aljoscha Krettek
Hello,
I have similar requirements (see StackOverflow). I am pretty new to flink, could you elaborate on a possible solution? We can guarantee good ordering by sensor_id, thus watermarking by key would be the only reasonable way for us (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my own watermarking after sensorData.keyBy('id').overwriteWatermarking()... per key? Or maybe using custom state plus a custom trigger? What happens if a sensor dies or is being removed completely, how can this be detected as watermarks would be ignored for window garbage collection. |
In reply to this post by leon_mclare
Hello,
I found this question in the Nabble archive (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html) but was unable/dont know how to reply. Here is my question regarding the mentioned thread: Hello, Thanks, Stephan |
Hi Stephan, I just wrote an answer to your SO question. 2016-11-10 11:01 GMT+01:00 Stephan Epping <[hidden email]>:
|
Hello Fabian,
Thank you very much. What is your opinion on the following solution: - Window data per time window, e.g. 15 minutes - using processing time as trigger, e.g. 15 minutes - which results in an aggregate over sensor values - then use cassandra to select the previous aggregate (as there can be multiple for the time window due to processing time) - then update the aggregate and put it into a cassandra sink again The cassandra select will be a bit slower than using an in memory/flink state, but will be cheaper in the end. Further, what does this have for consequences? For example, replaying events will be more difficult, right? Also, what about Snapshots? Will they work with the mentioned design? kind regards, Stephan
|
Hi Stephan, I'm skeptical about two things: 2016-11-14 8:39 GMT+01:00 Stephan Epping <[hidden email]>:
|
Hey Fabian,
thank you very much. - yes, I would window by event time and fire/purge by processing time - Cheaper in the end meant, that having too much state in the flink cluster would be more expensive, as we store all data in cassandra too.I think the fault tolerance would be okay, as we would make a compare and set with cassandra. With the flatMap Operator wouldn’t it be like running my own windowing mechanism? I need to keep the aggregate window per sensor open (with checkpointing and state management) until I receive an element for a sensor that is later in time than the windows time and then purge the state and emit a new event (which is like having a watermark per sensor). Further, I need a timer that fires like after 24 hours, in case a sensor dies and doesn’t send more data which might is possible with window assigner/trigger, right? But not inside normal functions, e.g. flatMap? We can guarantee that all sensor data per sensor comes almost in order (might be out of order within a few seconds), but there might be gaps of several hours after network partitions. There is now way to define/redefine the watermark per keyed stream? Or adjust the window assigner + trigger to achieve the desired behaviour? I am a bit reserved in implementing the whole state management. Do you plan to support such use cases on keyed streams? Maybe the WatermarkAssigner could also receive information about the key for wich the watermark should be calculated etc. best, Stephan
|
Hi Stephan, I was going to suggest that using a flatMap and tracking the timestamp of each key yourself is a bit like having a per-key watermark. I wanted to wait a bit before answering because I'm currently working on a new type of Function that will be release with Flink 1.2: ProcessFunction. This is somewhat like a FlatMap but also allows to access the element timestamp, query current processing time/event time and set (per key) timers for processing time and event time. With this you should be able to easily implement your per-key tracking, I hope. Cheers, Aljoscha P.S. ProcessFunction is already in the Flink repository but it's called TimelyFlatMapFunction right now, because I was working on it under that working title. On Mon, 14 Nov 2016 at 15:47 kaelumania <[hidden email]> wrote: Hey Fabian, |
Hey Aljoscha,
that sounds very promising, awesome! Though, I still would need to implement my own window management logic (window assignment and window state purging), right? I was thinking about reusing some of the existing components (TimeWindow) and WindowAssigner, but run my own WindowOperator (aka ProcessFunction). But I am not sure, if that is done easily. I would love to hear your opinion on that, and what the tricky parts will be? For example, common mistakes you experienced in developing the windowing mechanism. best Stephan
|
In reply to this post by Fabian Hueske-2
Hi Fabian,
your proposed solution for:
You can construct a data flow of cascading window operators and fork off (to emit or further processing) the result after each window.
does not work, am I missing something? First I tried the following DataStream<Reading> values = input.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()); // force lateness but due to late data the first fold function would emit 2 rolling aggregates (one with and one without the late element), which results in being counted twice within the second reducer. Therefore i tried WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input which gives me a compiler error as WindowedStream does not provide a timeWindow method. Finally I settled with this: KeyedStream<Reading, Tuple> readings = input Feedback is very welcome. best, Stephan
|
Hi, why did you settle for the last solution? Cheers, Aljoscha On Thu, 17 Nov 2016 at 15:57 kaelumania <[hidden email]> wrote: Hi Fabian, |
Hey Aljoscha,
the first solution did not work out as expected. As when late elements arrive the first window is triggered again and would emit a new (accumulated) event, that would be counted twice (in time accumulation and late accumulation) in the second window.I could implement my own (discarding strategy) like in Apache Beam, but the out stream should contain accumulated events that are stored in cassandra. The second solution just gave an compiler error, thus I think is not possible right now. best Stephan
|
You can implement discarding behaviour by writing a custom trigger (based on EventTimeTrigger) that returns FIRE_AND_PURGE when firing. With this you could maybe implement a cascade of windows where the first aggregates for the smallest time interval and is discarding and where the other triggers take these "pre-aggregated" values and accumulate. On Tue, 22 Nov 2016 at 08:11 Stephan Epping <[hidden email]> wrote:
|
Sounds good to me. But I still need to have some kind of side output (cassandra) that stores the accumulating aggregates on each time scale (minute, hour). Thus I would need to have something like this
var hourly = stream.window(1.hour).apply(..) //write to cassandra hourly.trigger(accumulating).addSink(cassandra) //forward to next acc step var daily = hourly.trigger(discarding).window(1.day).apply(…) //write to cassandra daily.trigger(accumulating).addSink(cassandra) Would this be possible?
|
Free forum by Nabble | Edit this page |