context in OnElement of Trigger

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

context in OnElement of Trigger

Vishal Santoshi
What does TriggerContext.getWaterMark() represent. Is it the system level watermark for the window operator ? When I pull it out I get a value of 0 a hole bunch of times ...
Reply | Threaded
Open this post in threaded view
|

Re: context in OnElement of Trigger

Vishal Santoshi
I think I got the 0 wrong, I do however want to figure out, how much I can depend on this watermark to represent the progress of the window. 

On Sat, Dec 16, 2017 at 9:06 PM, Vishal Santoshi <[hidden email]> wrote:
What does TriggerContext.getWaterMark() represent. Is it the system level watermark for the window operator ? When I pull it out I get a value of 0 a hole bunch of times ...

Reply | Threaded
Open this post in threaded view
|

Re: context in OnElement of Trigger

Fabian Hueske-2
Hi,

TriggerContext.getWaterMark() returns the current watermark (i.e., event-time) of the window operator.
An operator tracks for each of its inputs the maximum received watermark and computes its own watermark as the minimum of all these maximums.
Until an operator has not received watermarks from all its inputs, its own watermark will be Long.MIN_VALUE.

So, the watermark can indeed be used as an indicator for time progress but until it hasn't received wms from all inputs it's not initialized yet.

Hope this helps.

Best, Fabian

2017-12-17 3:24 GMT+01:00 Vishal Santoshi <[hidden email]>:
I think I got the 0 wrong, I do however want to figure out, how much I can depend on this watermark to represent the progress of the window. 

On Sat, Dec 16, 2017 at 9:06 PM, Vishal Santoshi <[hidden email]> wrote:
What does TriggerContext.getWaterMark() represent. Is it the system level watermark for the window operator ? When I pull it out I get a value of 0 a hole bunch of times ...


Reply | Threaded
Open this post in threaded view
|

Re: context in OnElement of Trigger

Vishal Santoshi

Thank you, that is very helpful.


This is a thought experiment do bear with me.


Our objective is to find paths ( funnel analysis) in a sessionized stream. The source streams ( kafka partitions sources) have been watermarked. A path a A[*]B  as in start at A and ends at B, a funnel to be more precise.



                  If your contention is true, and that is, that the water mark in the TriggerContext

is indeed a reflection of event time based progress of the stream, than emitting using a Time Based Tigger say every n seconds of WM progression, indeed implies that the window will have received all events before up till the operators sense of the watermark from various incident sources and this approximate order, that though elements within the  batch between emits may be unordered, the batches themselves are ordered, can be exploited. We would essentially do a FIRE ( no purge ) every n seconds, look at the items within the the pane, reduce the elements by extracting paths

realized, trimming the elements fully or up till a dangling A ( B has not appeared in the stream ), drastically reducing the state size. ( A[*}B are not numerous, they are very specific funnels registration etc )


That begs a question ( though I would be disappointed if that is not the case )


Does Time Based Trigger fire based on it's sense of Watermark as in say we wish to fire every 1 minute, the firing is the WMs progress of a minute rather than wall clock minute. If that is not the case we will have to write a custom trigger?


Thank you for helping out.


On Mon, Dec 18, 2017 at 5:09 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

TriggerContext.getWaterMark() returns the current watermark (i.e., event-time) of the window operator.
An operator tracks for each of its inputs the maximum received watermark and computes its own watermark as the minimum of all these maximums.
Until an operator has not received watermarks from all its inputs, its own watermark will be Long.MIN_VALUE.

So, the watermark can indeed be used as an indicator for time progress but until it hasn't received wms from all inputs it's not initialized yet.

Hope this helps.

Best, Fabian

2017-12-17 3:24 GMT+01:00 Vishal Santoshi <[hidden email]>:
I think I got the 0 wrong, I do however want to figure out, how much I can depend on this watermark to represent the progress of the window. 

On Sat, Dec 16, 2017 at 9:06 PM, Vishal Santoshi <[hidden email]> wrote:
What does TriggerContext.getWaterMark() represent. Is it the system level watermark for the window operator ? When I pull it out I get a value of 0 a hole bunch of times ...