Session Window with Custom Trigger

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Session Window with Custom Trigger

KristoffSC
Hi all,
I'm using Flink 1.9.2 and I would like to ask about my use case and approach
I've took to meet it.

The use case:
I have a keyed stream, where I have to buffer messages with logic:
1. Buffering should start only when message arrives.
2. The max buffer time should not be longer than 3 seconds
3. Each new message should NOT prolong the buffer time.
4. If particular business condition will be meet, buffering should stop and
all messages should be let through further processing.

The business logic in point 4 is taking under the consideration data from
previously buffered messages in this time buffer session.

My setup for this is
1.keyedStream with ProcessingTimeSessionWindow (I dont need EventTime for
this).
2. Custom Trigger

The custom trigger:
1. keeps some data in its state under AggregatingStateDescriptor allowing me
to override "merge" method from Trigger class.

2. In onElement method, for the first call I execute
ctx.registerEventTimeTimer(window.maxTimestamp());
Additionally in this method I added the busioenss logic which returns
TriggerResult.FIRE or TriggerResult.CONTINUE

3. The onProcessingTime methods returns TriggerResult.FIRE

3. all other methods are returning TriggerResult.CONTINUE



As a result, I can observe that my window is fired two times. One from
onElement method where the busienss condition is meet and second time from
onProcessingTime method.

What is the best way to prevent this?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Session Window with Custom Trigger

KristoffSC
One addition:
in clear method of my custom trigger I do call
ctx.deleteProcessingTimeTimer(window.maxTimestamp());



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Session Window with Custom Trigger

Marco Villalobos-2
In reply to this post by KristoffSC
Hi Kristoff,

> On Jun 23, 2020, at 6:52 AM, KristoffSC <[hidden email]> wrote:
>
> Hi all,
> I'm using Flink 1.9.2 and I would like to ask about my use case and approach
> I've took to meet it.
>
> The use case:
> I have a keyed stream, where I have to buffer messages with logic:
> 1. Buffering should start only when message arrives.
> 2. The max buffer time should not be longer than 3 seconds
> 3. Each new message should NOT prolong the buffer time.
> 4. If particular business condition will be meet, buffering should stop and
> all messages should be let through further processing.

As an alternate solution that does not use custom triggers, 1-4 can be handled with a Tumbling window, and a ProcessWindowFunction.

When you override the  "process" method, you'll have Iterable list of events that happened in that window.
There you can check your business condition specified in step 4.

>
> The business logic in point 4 is taking under the consideration data from
> previously buffered messages in this time buffer session.
>
> My setup for this is
> 1.keyedStream with ProcessingTimeSessionWindow (I dont need EventTime for
> this).
> 2. Custom Trigger
>
> The custom trigger:
> 1. keeps some data in its state under AggregatingStateDescriptor allowing me
> to override "merge" method from Trigger class.
>
> 2. In onElement method, for the first call I execute
> ctx.registerEventTimeTimer(window.maxTimestamp());
> Additionally in this method I added the busioenss logic which returns
> TriggerResult.FIRE or TriggerResult.CONTINUE
>
> 3. The onProcessingTime methods returns TriggerResult.FIRE
>
> 3. all other methods are returning TriggerResult.CONTINUE
>
>
>
> As a result, I can observe that my window is fired two times. One from
> onElement method where the busienss condition is meet and second time from
> onProcessingTime method.
>
> What is the best way to prevent this?
>
> Regards,
> Krzysztof
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Session Window with Custom Trigger

KristoffSC
Hi Marco Villalobos-2
unfortunately I don't think Tumbling window will work in my case.

The reasons:
1. Window must start only when there is a new event, and previous window is
closed. The new Tumbling window is created just after previews one is
purged. In my case I have to use SessionWindow where Session windows do not
overlap and do not have a fixed start and end time, in contrast to tumbling
windows and sliding windows [1].

2. My logic required to close the window earlier, before Window maxTime
hence custom trigger.


The issue I'm having though is that My Trigger firres the windwo two times.
First time from onElement method and secodn time from onProcessignTime
method.




[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Session Window with Custom Trigger

KristoffSC
It seems that I'm clearing the timers in a right way, but there is a new
timer created from WindowOperator::registerCleanupTimer method. This one is
called from WindowOperator::processElement at the end of both if/else
branches.

How can I mitigate this? I dont want to have any "late firings" for my
windows. The windwo should be fully closed after Trigger::onElement or
Trigger::onProcessingTimne method if there was no onElement Fire result
created.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Session Window with Custom Trigger

KristoffSC
I think I've figured it out.

I switched to GlobalWidnow with my custom trigger. My Trigger combines
processingTime trigger logic and onElement trigger logic. Only one should be
executed in scope of particular window.

I managed to do this by returning FIRE_AND_PURGE and cleat all timers and
state whenever I'm closing the window.

In my case I don't have "late events" that should be added into previously
ended window so it simplifies the job.

Thanks :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/