I need to collect timeseries data from thousands of IoT devices. Each device has name, value, and timestamp published to one Kafka topic. The event time timestamps are in order only relation with the individual
device, but out of order with respect to other devices. Is there a way to aggregate a 15 minute window of this in which each IoT devices gets aggregated with its own event time? I would deeply appreciate if somebody could guide me to an approach for solving this in Flink. I wish there was a group chat for these type of problems.
|
Hello, I am also working on something similar. Below is the pipeline design I have, sharing may be it can be helpful. topic -> keyed stream on device-id -> window operation -> sink. You can PM me on further details. Thanks, Hemant On Tue, Feb 25, 2020 at 1:54 AM Marco Villalobos <[hidden email]> wrote:
|
Hi, At last flink forward in Berlin I spoke with some persons about the same problem, where they had construction devices as IoT sensors which could even be offline for multiple days. They told me that the major problem for them was that the watermark in Flink is maintained per operator /subtask, even if you group by key. They solved their problem via a Flink process function where they have full control over state and timers, so you can deal with each device as you like and can e. g. maintain something similar to a per device watermark. I also think that it is the best way to go for this usecase. Best regards Theo -------- Ursprüngliche Nachricht -------- Von: hemant singh <[hidden email]> Datum: Di., 25. Feb. 2020, 06:19 An: Marco Villalobos <[hidden email]> Cc: [hidden email] Betreff: Re: Timeseries aggregation with many IoT devices off of one Kafka topic.
|
Hi Theo, We also have the same scenario. If it would be great if you could provide some examples or more details about flink process function. Thanks, Avinash
|
Hi, I think conceptually the pipeline could look something like this: env .addSource(...) .keyBy("device_id") .window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(10))) .trigger(new Trigger { def onElement(el, timestamp, window, ctx) = { if (window.start == TimeWindow.getWindowStartWithOffset(timestamp, 0, 10_000)) { ctx.registerEventTimeTimer(window.end) } TriggerResult.CONTINUE } def onEventTime(time, window, ctx) = { TriggerResult.FIRE } })) .aggregate(...) (slide 10s needs to be adjusted) Regards, Roman
On Tue, Feb 25, 2020 at 3:44 PM Avinash Tripathy <[hidden email]> wrote:
|
Hi, Ververica has great tutorials online on how to write Flink pipelines, also with a small training section with regards to Process functions: Best regards Theo Von: "Khachatryan Roman" <[hidden email]> An: "Avinash Tripathy" <[hidden email]> CC: "Theo Diefenthal" <[hidden email]>, "hemant singh" <[hidden email]>, "Marco Villalobos" <[hidden email]>, "user" <[hidden email]> Gesendet: Dienstag, 25. Februar 2020 19:08:16 Betreff: Re: Timeseries aggregation with many IoT devices off of one Kafka topic. Hi, I think conceptually the pipeline could look something like this: env .addSource(...) .keyBy("device_id") .window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(10))) .trigger(new Trigger { def onElement(el, timestamp, window, ctx) = { if (window.start == TimeWindow.getWindowStartWithOffset(timestamp, 0, 10_000)) { ctx.registerEventTimeTimer(window.end) } TriggerResult.CONTINUE } def onEventTime(time, window, ctx) = { TriggerResult.FIRE } })) .aggregate(...) (slide 10s needs to be adjusted) Regards, Roman
On Tue, Feb 25, 2020 at 3:44 PM Avinash Tripathy <[hidden email]> wrote:
-- SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln Theo Diefenthal T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 [hidden email] - www.scoop-software.de Sitz der Gesellschaft: Köln, Handelsregister: Köln, Handelsregisternummer: HRB 36625 Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen, Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel |
Free forum by Nabble | Edit this page |