Auto adjusting watermarks?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Auto adjusting watermarks?

Theo
Hi there,

Currently I have a job pipeline reading data from > 10 different kind of sources with each having different out-of-orderness characteristics. I am currently working on adjusting the watermarks for each source "properly". I work with BoundedOutOfOrdernessTimestampExtractor and, as usual, I want the maxOutOfOrderness as low as possible while still keeping as much elements as possible in time as late arrivals trigger rather expensive computations.

Now I thought, what I probably want is something like "I want to have about 99.9% of my elements within the allowed lateness". Of course, I don't know the future events out-of-orderness, but I can predict it from the past, e.g. via a histogram with a 99.9% percentile, and adjust the maxOutOfOrdernesss dynamically.

As Flink provides rather simplified Timestamp Assigner only but allows me to create my own ones with arbitrary complexity, I was wondering if somebody of you already did something like that, if that's a viable approach and I'm on a good track here?

Best regards
Theo
Reply | Threaded
Open this post in threaded view
|

Re: Auto adjusting watermarks?

Congxian Qiu
Hi

Could it be store a histogram data in custom `BoundedOutOfOrdernessTimestampExtractor` and adjust the `maxOutOfOrderness` according to the histogram data ok for you case? (be careful, such histogram data would not snapshot out when checkpointing)

Best,
Congxian


Theo Diefenthal <[hidden email]> 于2020年5月30日周六 上午4:35写道:
Hi there,

Currently I have a job pipeline reading data from > 10 different kind of sources with each having different out-of-orderness characteristics. I am currently working on adjusting the watermarks for each source "properly". I work with BoundedOutOfOrdernessTimestampExtractor and, as usual, I want the maxOutOfOrderness as low as possible while still keeping as much elements as possible in time as late arrivals trigger rather expensive computations.

Now I thought, what I probably want is something like "I want to have about 99.9% of my elements within the allowed lateness". Of course, I don't know the future events out-of-orderness, but I can predict it from the past, e.g. via a histogram with a 99.9% percentile, and adjust the maxOutOfOrdernesss dynamically.

As Flink provides rather simplified Timestamp Assigner only but allows me to create my own ones with arbitrary complexity, I was wondering if somebody of you already did something like that, if that's a viable approach and I'm on a good track here?

Best regards
Theo
Reply | Threaded
Open this post in threaded view
|

Re: Auto adjusting watermarks?

Theo
Hi Congxian,

Thank's for your feedback. You raised a point I also already thought about. As "assignTimestampsAndWatermarks" creates an operator extending the standard AbstractUdfStreamOperator, I can also implement a RichFunction watermark assigner with full state access. In my case, I was also wondering whether it's a good idea to have a stateful watermark assigner or whether its more practical to have no state on start and build my histogram over time, new with each job restart... That's also why I asked on the mailing list so I can feedback of other people customizing the watermark assigners.

Best regards
Theo


Von: "Congxian Qiu" <[hidden email]>
An: "Theo Diefenthal" <[hidden email]>
CC: "user" <[hidden email]>
Gesendet: Samstag, 30. Mai 2020 05:06:12
Betreff: Re: Auto adjusting watermarks?

Hi
Could it be store a histogram data in custom `BoundedOutOfOrdernessTimestampExtractor` and adjust the `maxOutOfOrderness` according to the histogram data ok for you case? (be careful, such histogram data would not snapshot out when checkpointing)

Best,
Congxian


Theo Diefenthal <[hidden email]> 于2020年5月30日周六 上午4:35写道:
Hi there,

Currently I have a job pipeline reading data from > 10 different kind of sources with each having different out-of-orderness characteristics. I am currently working on adjusting the watermarks for each source "properly". I work with BoundedOutOfOrdernessTimestampExtractor and, as usual, I want the maxOutOfOrderness as low as possible while still keeping as much elements as possible in time as late arrivals trigger rather expensive computations.

Now I thought, what I probably want is something like "I want to have about 99.9% of my elements within the allowed lateness". Of course, I don't know the future events out-of-orderness, but I can predict it from the past, e.g. via a histogram with a 99.9% percentile, and adjust the maxOutOfOrdernesss dynamically.

As Flink provides rather simplified Timestamp Assigner only but allows me to create my own ones with arbitrary complexity, I was wondering if somebody of you already did something like that, if that's a viable approach and I'm on a good track here?

Best regards
Theo
Reply | Threaded
Open this post in threaded view
|

Re: Auto adjusting watermarks?

Arvid Heise-3
Hi Theo,

The general idea is interesting. I'd probably start with some initial out of boundness, and after collecting X elements, switch to the histogram. It sounds very valid to snapshot it. I'd probably use a union state to also support rescaling in a meaningful way.

However, tbh for a production use case, I'd probably go with a bit more deterministic (=simpler) approach, and tweak it accordingly manually. If late events are something to worry about, I'd have a metric on it anyways with alerts and tweak the job accordingly.

Another dimension to look at, if you can actually recalculate results based on the late events, so you actually get the best of two worlds (low initial latency, but precise end results). I'd recommend also having a look at the retract streams of Table API.

On Sat, May 30, 2020 at 10:04 PM Theo Diefenthal <[hidden email]> wrote:
Hi Congxian,

Thank's for your feedback. You raised a point I also already thought about. As "assignTimestampsAndWatermarks" creates an operator extending the standard AbstractUdfStreamOperator, I can also implement a RichFunction watermark assigner with full state access. In my case, I was also wondering whether it's a good idea to have a stateful watermark assigner or whether its more practical to have no state on start and build my histogram over time, new with each job restart... That's also why I asked on the mailing list so I can feedback of other people customizing the watermark assigners.

Best regards
Theo


Von: "Congxian Qiu" <[hidden email]>
An: "Theo Diefenthal" <[hidden email]>
CC: "user" <[hidden email]>
Gesendet: Samstag, 30. Mai 2020 05:06:12
Betreff: Re: Auto adjusting watermarks?

Hi
Could it be store a histogram data in custom `BoundedOutOfOrdernessTimestampExtractor` and adjust the `maxOutOfOrderness` according to the histogram data ok for you case? (be careful, such histogram data would not snapshot out when checkpointing)

Best,
Congxian


Theo Diefenthal <[hidden email]> 于2020年5月30日周六 上午4:35写道:
Hi there,

Currently I have a job pipeline reading data from > 10 different kind of sources with each having different out-of-orderness characteristics. I am currently working on adjusting the watermarks for each source "properly". I work with BoundedOutOfOrdernessTimestampExtractor and, as usual, I want the maxOutOfOrderness as low as possible while still keeping as much elements as possible in time as late arrivals trigger rather expensive computations.

Now I thought, what I probably want is something like "I want to have about 99.9% of my elements within the allowed lateness". Of course, I don't know the future events out-of-orderness, but I can predict it from the past, e.g. via a histogram with a 99.9% percentile, and adjust the maxOutOfOrdernesss dynamically.

As Flink provides rather simplified Timestamp Assigner only but allows me to create my own ones with arbitrary complexity, I was wondering if somebody of you already did something like that, if that's a viable approach and I'm on a good track here?

Best regards
Theo


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng