Window on stream with timestamps ascending by key

classic Classic list List threaded Threaded
4 messages Options
cam
Reply | Threaded
Open this post in threaded view
|

Window on stream with timestamps ascending by key

cam
I wonder how to work with a stream with event timestamps ascending by key.

I can have a huge time skew between different keys, for example if I (re)connect an event producer,
it will send all buffered results possibly from the last days.

Is it possible to trigger the window computation per key ?

Example with a window of 5 seconds and the window function being the count of the timestamps :

KEY1 1000
KEY1 1001
KEY1 1002
KEY2    1
KEY2    2
KEY2    3
KEY2    4
KEY2    5
KEY2 window => 15
KEY1 1003
KEY2    6
KEY2    7
KEY2    8
KEY2    9
KEY2   10
KEY2 window => 40
KEY1 1004
KEY2   11
KEY2   12
KEY2   13
KEY2   14
KEY2   15
KEY2 window => 65
KEY1 1005
KEY1 window => 5015
...


Reply | Threaded
Open this post in threaded view
|

Re: Window on stream with timestamps ascending by key

Aljoscha Krettek
Hi,
what you essentially would require is watermarks that are tracked by key. Right now this is not possible in Flink. The watermarks, which are used for keeping track of the timestamps, are global across all keys.

Maybe you could implement something that fits your requirements in a custom operator, i.e. by using DataStream.transform() and writing a StreamOperator (more specifically a OneInputStreamOperator).

Let us know if you need more information.

Cheers,
Aljoscha

> On 17 Mar 2016, at 11:21, Charles-Antoine Mathieu <[hidden email]> wrote:
>
> I wonder how to work with a stream with event timestamps ascending by key.
>
> I can have a huge time skew between different keys, for example if I (re)connect an event producer,
> it will send all buffered results possibly from the last days.
>
> Is it possible to trigger the window computation per key ?
>
> Example with a window of 5 seconds and the window function being the count of the timestamps :
>
> KEY1 1000
> KEY1 1001
> KEY1 1002
> KEY2    1
> KEY2    2
> KEY2    3
> KEY2    4
> KEY2    5
> KEY2 window => 15
> KEY1 1003
> KEY2    6
> KEY2    7
> KEY2    8
> KEY2    9
> KEY2   10
> KEY2 window => 40
> KEY1 1004
> KEY2   11
> KEY2   12
> KEY2   13
> KEY2   14
> KEY2   15
> KEY2 window => 65
> KEY1 1005
> KEY1 window => 5015
> ...
>
>

cam
Reply | Threaded
Open this post in threaded view
|

Re: Window on stream with timestamps ascending by key

cam
Hi,

I'd be happy to implement such an operator, I'm failry new to Flink so I'm still diving into it as fast as I can.

Do you have some more insights about what to implement over OneInputStreamOperator ? I'm not sure if I should play with watermarks here or build a similar concept. How is that different from implementing the logic inside a FlatMap operator ?

Regards,
CAM
Reply | Threaded
Open this post in threaded view
|

Re: Window on stream with timestamps ascending by key

Stephan Ewen
If you would not be using watermarks at all, but only special events that signal the "per key watermark", then you can simply build this on top of "keyBy().flatMap()".

The custom operator would only be needed if you need access to the watermarks, of the processing time trigger scheduler.

On Thu, Mar 24, 2016 at 2:18 PM, cam <[hidden email]> wrote:
Hi,

I'd be happy to implement such an operator, I'm failry new to Flink so I'm
still diving into it as fast as I can.

Do you have some more insights about what to implement over
OneInputStreamOperator ? I'm not sure if I should play with watermarks here
or build a similar concept. How is that different from implementing the
logic inside a FlatMap operator ?

Regards,
CAM



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-on-stream-with-timestamps-ascending-by-key-tp5598p5745.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.