Early firing window implementation issue

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Early firing window implementation issue

simpleusr
Hi,

I am having quite hard time to understand flink windowing principals and would be very pleased if you could point me in the right direction.

My purpose is to count the number of recurring events for a time interval and generate alert events if the number of recurring events is greater than a threshold.

As I understand, windowing is a perfect match for this scenario.

Additional requirement is to generate an early alert if  recurring events count in a window is 2 (i.e. alert should be generated without waiting window end).

I thought that an alert event generating process window function can be used to aggregate windowed events and a custom trigger can be used to emit early results from the window based on the recurring events count (before the watermark reaches the window’s end timestamp).

I am using event-time semantics and having problems/questions for the custom trigger .

You can find the actual implementation in the gist:

https://gist.github.com/simpleusr/7c56d4384f6fc9f0a61860a680bb5f36
 
 
I am using keyed state to keep track of element count in the window (encounteredElementsCountState)

Upon receiving first element I register EventTimeTimer to the window end. This is supposed to trigger FIRE_AND_PURGE for window closing and working as expected.

If the count exceeds threshold , I try to trigger early fire. This also seems to be successful, processwindow function is called immediately after this firing.

The problem is, I had to insert below check to the code without understanding the reason. Because the previously collected elements were again supplied to onElement method ...

            if (ctx.getCurrentWatermark() < 0) {
                logger.debug(String.format("onElement processing skipped for eventId : %s for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark()));
                return TriggerResult.CONTINUE;
            }
                                      

I could not figure out  the reason. What I see is that when this happens the watermark value is (ctx.getCurrentWatermark()) Long.MIN_VALUE ( that leaded to the above check) . How can this happen ?

This check seems to avoid duplicate early event generation, but I do not know why this happens and is this workaround is appropriate.

Could you please advice why the same elements are processed twice in the window?

Another question is about the keyed state usage. Does this implementation leaks any state after window is disposed? I am trying to clear all used states in clear method of the trigger but would that be enough?
 

Regards
Reply | Threaded
Open this post in threaded view
|

Re: Early firing window implementation issue

simpleusr
Hi all

Any recommendation about the issue?

Regards

25 Tem 2019 Per 22:37 tarihinde Ceyhan Kasap <[hidden email]> şunu yazdı:
Hi,

I am having quite hard time to understand flink windowing principals and would be very pleased if you could point me in the right direction.

My purpose is to count the number of recurring events for a time interval and generate alert events if the number of recurring events is greater than a threshold.

As I understand, windowing is a perfect match for this scenario.

Additional requirement is to generate an early alert if  recurring events count in a window is 2 (i.e. alert should be generated without waiting window end).

I thought that an alert event generating process window function can be used to aggregate windowed events and a custom trigger can be used to emit early results from the window based on the recurring events count (before the watermark reaches the window’s end timestamp).

I am using event-time semantics and having problems/questions for the custom trigger .

You can find the actual implementation in the gist:

https://gist.github.com/simpleusr/7c56d4384f6fc9f0a61860a680bb5f36
 
 
I am using keyed state to keep track of element count in the window (encounteredElementsCountState)

Upon receiving first element I register EventTimeTimer to the window end. This is supposed to trigger FIRE_AND_PURGE for window closing and working as expected.

If the count exceeds threshold , I try to trigger early fire. This also seems to be successful, processwindow function is called immediately after this firing.

The problem is, I had to insert below check to the code without understanding the reason. Because the previously collected elements were again supplied to onElement method ...

            if (ctx.getCurrentWatermark() < 0) {
                logger.debug(String.format("onElement processing skipped for eventId : %s for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark()));
                return TriggerResult.CONTINUE;
            }
                                      

I could not figure out  the reason. What I see is that when this happens the watermark value is (ctx.getCurrentWatermark()) Long.MIN_VALUE ( that leaded to the above check) . How can this happen ?

This check seems to avoid duplicate early event generation, but I do not know why this happens and is this workaround is appropriate.

Could you please advice why the same elements are processed twice in the window?

Another question is about the keyed state usage. Does this implementation leaks any state after window is disposed? I am trying to clear all used states in clear method of the trigger but would that be enough?
 

Regards