Hello, Our current watermark model is "some time behind the most recent seen element" (very close to what the docs have in "Periodic Watermark" https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks). It fits our current processing model. The thing is, we want to extract information about elements appearing behind the watermark, to give some insight when we need to update the amount of time behind the most seen element we need. The problem is, I can't create any metrics inside the AssignerWithPeriodicWatermarks 'cause it has no `getRuntime()` to attach the metric. Is there any way we can count those (a ProcessFunction before the .assignTimestampsAndWatermarks(), maybe)? -- Julio Biason, Sofware Engineer AZION | Deliver. Accelerate. Protect. Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101 | Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554 |
Hi Julio, If I understand correctly, you want to adjust your watermarks automatically? It is true that there are no direct ways to get metric from the AssignerWithPeriodicWatermarks. Adding ProcessFunction before assignTimestampsAndWatermarks seems a solution. In the ProcessFunction, you can count the late number and send the number to the downstream assignTimestampsAndWatermarks to adjust watermarks. Best, Hequn On Tue, Jul 31, 2018 at 1:32 AM, Julio Biason <[hidden email]> wrote:
|
In reply to this post by Julio Biason
You can create a ProcessFunction. That gives you access to getRuntimeContext to register metrics, to the element timestamp, and the current watermark. Keep in mind that operators first process a record and then process any watermark that was the result of that record, so that when you get the current watermark from within the processElement method, the watermark generated from that element won't be the current watermark. On Mon, Jul 30, 2018 at 10:33 AM Julio Biason <[hidden email]> wrote:
|
Hey Elias, Thanks for the tips. Unfortunately, it seems `Context` only have information from the element being processed (https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java#L91) and the RuntimeContext doesn't have access to any watermark information (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L57). On Mon, Jul 30, 2018 at 10:28 PM, Elias Levy <[hidden email]> wrote:
-- Julio Biason, Sofware Engineer AZION | Deliver. Accelerate. Protect. Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101 | Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554 |
Correct. Context gives you access to the element timestamp. But it also gives you access to the current watermark via timerService -> currentWatermark.
On Tue, Jul 31, 2018 at 7:45 AM Julio Biason <[hidden email]> wrote:
|
Awesome, thanks Elias! On Tue, Jul 31, 2018 at 10:02 PM, Elias Levy <[hidden email]> wrote:
-- Julio Biason, Sofware Engineer AZION | Deliver. Accelerate. Protect. Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101 | Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554 |
Free forum by Nabble | Edit this page |