I wonder how to work with a stream with event timestamps ascending by key.
I can have a huge time skew between different keys, for example if I (re)connect an event producer, it will send all buffered results possibly from the last days. Is it possible to trigger the window computation per key ? Example with a window of 5 seconds and the window function being the count of the timestamps : KEY1 1000 KEY1 1001 KEY1 1002 KEY2 1 KEY2 2 KEY2 3 KEY2 4 KEY2 5 KEY2 window => 15 KEY1 1003 KEY2 6 KEY2 7 KEY2 8 KEY2 9 KEY2 10 KEY2 window => 40 KEY1 1004 KEY2 11 KEY2 12 KEY2 13 KEY2 14 KEY2 15 KEY2 window => 65 KEY1 1005 KEY1 window => 5015 ... |
Hi,
what you essentially would require is watermarks that are tracked by key. Right now this is not possible in Flink. The watermarks, which are used for keeping track of the timestamps, are global across all keys. Maybe you could implement something that fits your requirements in a custom operator, i.e. by using DataStream.transform() and writing a StreamOperator (more specifically a OneInputStreamOperator). Let us know if you need more information. Cheers, Aljoscha > On 17 Mar 2016, at 11:21, Charles-Antoine Mathieu <[hidden email]> wrote: > > I wonder how to work with a stream with event timestamps ascending by key. > > I can have a huge time skew between different keys, for example if I (re)connect an event producer, > it will send all buffered results possibly from the last days. > > Is it possible to trigger the window computation per key ? > > Example with a window of 5 seconds and the window function being the count of the timestamps : > > KEY1 1000 > KEY1 1001 > KEY1 1002 > KEY2 1 > KEY2 2 > KEY2 3 > KEY2 4 > KEY2 5 > KEY2 window => 15 > KEY1 1003 > KEY2 6 > KEY2 7 > KEY2 8 > KEY2 9 > KEY2 10 > KEY2 window => 40 > KEY1 1004 > KEY2 11 > KEY2 12 > KEY2 13 > KEY2 14 > KEY2 15 > KEY2 window => 65 > KEY1 1005 > KEY1 window => 5015 > ... > > |
Hi,
I'd be happy to implement such an operator, I'm failry new to Flink so I'm still diving into it as fast as I can. Do you have some more insights about what to implement over OneInputStreamOperator ? I'm not sure if I should play with watermarks here or build a similar concept. How is that different from implementing the logic inside a FlatMap operator ? Regards, CAM |
If you would not be using watermarks at all, but only special events that signal the "per key watermark", then you can simply build this on top of "keyBy().flatMap()". The custom operator would only be needed if you need access to the watermarks, of the processing time trigger scheduler. On Thu, Mar 24, 2016 at 2:18 PM, cam <[hidden email]> wrote: Hi, |
Free forum by Nabble | Edit this page |