Watermarks per key

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Watermarks per key

jganoff
Hi,

I’m designing a streaming job whose elements need to be windowed by event time across a large set of keys. All elements are read from the same source. Event time progresses independently across keys. Is it possible to assign timestamps, and thus generate independent watermarks, per keyed stream, so late arriving elements can be handled per keyed stream?

And in general, what’s the best approach to designing a job that needs to process different keyed streams whose event times do not relate to each other? My current approach generates timestamps at the source but never generates watermarks so no record is ever considered late. This has the unfortunate side effect of windows never closing. As a result, each event time window relies on a custom trigger which fires and purges the window after a given amount of processing time elapses during which no new records arrived.

Thanks,
Jordan
Reply | Threaded
Open this post in threaded view
|

Re: Watermarks per key

Fabian Hueske-2
Hi Jordan,

it is not possible to generate watermarks per key. This feature has been requested a couple of times but I think there are no plans to implement that.
As far as I understand, the management of watermarks would be quite expensive (maintaining several watermarks, purging watermarks of expired keys, etc.) but Aljoscha (in CC) can share details about that.

Best,
Fabian

2017-02-15 2:02 GMT+01:00 Jordan Ganoff <[hidden email]>:
Hi,

I’m designing a streaming job whose elements need to be windowed by event time across a large set of keys. All elements are read from the same source. Event time progresses independently across keys. Is it possible to assign timestamps, and thus generate independent watermarks, per keyed stream, so late arriving elements can be handled per keyed stream?

And in general, what’s the best approach to designing a job that needs to process different keyed streams whose event times do not relate to each other? My current approach generates timestamps at the source but never generates watermarks so no record is ever considered late. This has the unfortunate side effect of windows never closing. As a result, each event time window relies on a custom trigger which fires and purges the window after a given amount of processing time elapses during which no new records arrived.

Thanks,
Jordan

Reply | Threaded
Open this post in threaded view
|

Re: Watermarks per key

jganoff
There's nothing stopping me assigning timestamps and generating watermarks on a keyed stream in the implementation and the KeyedStream API supports it. It appears the underlying operator that gets created in DataStream.assignTimestampsAndWatermarks() isn't key-aware and globally tracks timestamps. So is that what's technically preventing assigning timestamps per key from working?

I'm curious to hear Aljoscha's thoughts on watermark management across keys.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Watermarks per key

Aljoscha Krettek
Hi,
managing a per-key watermark would require keeping to current watermark for each key, for example at the sources or in a timestamp/watermark assigner. The problem then is figuring out when you can discard that state because it would otherwise grow indefinitely if you have an evolving key space.

You can simulate per-key watermarks by having a wrapping type in your pipeline that carries the watermarks. Something like ValueOrWatermark<K, V> and then in your operators you manually manage the watermark, per-key in your own state. You would run into the same problem of the evolving key-space, however.

Cheers,
Aljoscha

On Mon, 20 Feb 2017 at 22:27 jganoff <[hidden email]> wrote:
There's nothing stopping me assigning timestamps and generating watermarks on
a keyed stream in the implementation and the KeyedStream API supports it. It
appears the underlying operator that gets created in
DataStream.assignTimestampsAndWatermarks() isn't key-aware and globally
tracks timestamps. So is that what's technically preventing assigning
timestamps per key from working?

I'm curious to hear Aljoscha's thoughts on watermark management across keys.

Thanks!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Watermarks-per-key-tp11628p11761.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.