Counting elements that appear "behind" the watermark

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Counting elements that appear "behind" the watermark

Julio Biason
Hello,

Our current watermark model is "some time behind the most recent seen element" (very close to what the docs have in "Periodic Watermark" https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks). It fits our current processing model.

The thing is, we want to extract information about elements appearing behind the watermark, to give some insight when we need to update the amount of time behind the most seen element we need. The problem is, I can't create any metrics inside the AssignerWithPeriodicWatermarks 'cause it has no `getRuntime()` to attach the metric.

Is there any way we can count those (a ProcessFunction before the .assignTimestampsAndWatermarks(), maybe)?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: Counting elements that appear "behind" the watermark

Hequn Cheng
Hi Julio,

If I understand correctly, you want to adjust your watermarks automatically?
It is true that there are no direct ways to get metric from the AssignerWithPeriodicWatermarks. Adding ProcessFunction before assignTimestampsAndWatermarks seems a solution. In the ProcessFunction, you can count the late number and send the number to the downstream assignTimestampsAndWatermarks to adjust watermarks.

Best, Hequn

On Tue, Jul 31, 2018 at 1:32 AM, Julio Biason <[hidden email]> wrote:
Hello,

Our current watermark model is "some time behind the most recent seen element" (very close to what the docs have in "Periodic Watermark" https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks). It fits our current processing model.

The thing is, we want to extract information about elements appearing behind the watermark, to give some insight when we need to update the amount of time behind the most seen element we need. The problem is, I can't create any metrics inside the AssignerWithPeriodicWatermarks 'cause it has no `getRuntime()` to attach the metric.

Is there any way we can count those (a ProcessFunction before the .assignTimestampsAndWatermarks(), maybe)?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554

Reply | Threaded
Open this post in threaded view
|

Re: Counting elements that appear "behind" the watermark

Elias Levy
In reply to this post by Julio Biason
You can create a ProcessFunction.  That gives you access to getRuntimeContext to register metrics, to the element timestamp, and the current watermark.  Keep in mind that operators first process a record and then process any watermark that was the result of that record, so that when you get the current watermark from within the processElement method, the watermark generated from that element won't be the current watermark.

On Mon, Jul 30, 2018 at 10:33 AM Julio Biason <[hidden email]> wrote:
Hello,

Our current watermark model is "some time behind the most recent seen element" (very close to what the docs have in "Periodic Watermark" https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks). It fits our current processing model.

The thing is, we want to extract information about elements appearing behind the watermark, to give some insight when we need to update the amount of time behind the most seen element we need. The problem is, I can't create any metrics inside the AssignerWithPeriodicWatermarks 'cause it has no `getRuntime()` to attach the metric.

Is there any way we can count those (a ProcessFunction before the .assignTimestampsAndWatermarks(), maybe)?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: Counting elements that appear "behind" the watermark

Julio Biason
Hey Elias,

Thanks for the tips. Unfortunately, it seems `Context` only have information from the element being processed (https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java#L91) and the RuntimeContext doesn't have access to any watermark information (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L57).

On Mon, Jul 30, 2018 at 10:28 PM, Elias Levy <[hidden email]> wrote:
You can create a ProcessFunction.  That gives you access to getRuntimeContext to register metrics, to the element timestamp, and the current watermark.  Keep in mind that operators first process a record and then process any watermark that was the result of that record, so that when you get the current watermark from within the processElement method, the watermark generated from that element won't be the current watermark.

On Mon, Jul 30, 2018 at 10:33 AM Julio Biason <[hidden email]> wrote:
Hello,

Our current watermark model is "some time behind the most recent seen element" (very close to what the docs have in "Periodic Watermark" https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks). It fits our current processing model.

The thing is, we want to extract information about elements appearing behind the watermark, to give some insight when we need to update the amount of time behind the most seen element we need. The problem is, I can't create any metrics inside the AssignerWithPeriodicWatermarks 'cause it has no `getRuntime()` to attach the metric.

Is there any way we can count those (a ProcessFunction before the .assignTimestampsAndWatermarks(), maybe)?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554



--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: Counting elements that appear "behind" the watermark

Elias Levy
Correct.  Context gives you access to the element timestamp.  But it also gives you access to the current watermark via timerService -> currentWatermark.

On Tue, Jul 31, 2018 at 7:45 AM Julio Biason <[hidden email]> wrote:
Thanks for the tips. Unfortunately, it seems `Context` only have information from the element being processed (https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java#L91) and the RuntimeContext doesn't have access to any watermark information (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L57).

Reply | Threaded
Open this post in threaded view
|

Re: Counting elements that appear "behind" the watermark

Julio Biason
Awesome, thanks Elias!

On Tue, Jul 31, 2018 at 10:02 PM, Elias Levy <[hidden email]> wrote:
Correct.  Context gives you access to the element timestamp.  But it also gives you access to the current watermark via timerService -> currentWatermark.

On Tue, Jul 31, 2018 at 7:45 AM Julio Biason <[hidden email]> wrote:
Thanks for the tips. Unfortunately, it seems `Context` only have information from the element being processed (https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java#L91) and the RuntimeContext doesn't have access to any watermark information (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L57).




--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554