Hi all,
I'm using Flink 1.9.2 and I would like to ask about my use case and approach I've took to meet it. The use case: I have a keyed stream, where I have to buffer messages with logic: 1. Buffering should start only when message arrives. 2. The max buffer time should not be longer than 3 seconds 3. Each new message should NOT prolong the buffer time. 4. If particular business condition will be meet, buffering should stop and all messages should be let through further processing. The business logic in point 4 is taking under the consideration data from previously buffered messages in this time buffer session. My setup for this is 1.keyedStream with ProcessingTimeSessionWindow (I dont need EventTime for this). 2. Custom Trigger The custom trigger: 1. keeps some data in its state under AggregatingStateDescriptor allowing me to override "merge" method from Trigger class. 2. In onElement method, for the first call I execute ctx.registerEventTimeTimer(window.maxTimestamp()); Additionally in this method I added the busioenss logic which returns TriggerResult.FIRE or TriggerResult.CONTINUE 3. The onProcessingTime methods returns TriggerResult.FIRE 3. all other methods are returning TriggerResult.CONTINUE As a result, I can observe that my window is fired two times. One from onElement method where the busienss condition is meet and second time from onProcessingTime method. What is the best way to prevent this? Regards, Krzysztof -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
One addition:
in clear method of my custom trigger I do call ctx.deleteProcessingTimeTimer(window.maxTimestamp()); -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by KristoffSC
Hi Kristoff,
> On Jun 23, 2020, at 6:52 AM, KristoffSC <[hidden email]> wrote: > > Hi all, > I'm using Flink 1.9.2 and I would like to ask about my use case and approach > I've took to meet it. > > The use case: > I have a keyed stream, where I have to buffer messages with logic: > 1. Buffering should start only when message arrives. > 2. The max buffer time should not be longer than 3 seconds > 3. Each new message should NOT prolong the buffer time. > 4. If particular business condition will be meet, buffering should stop and > all messages should be let through further processing. As an alternate solution that does not use custom triggers, 1-4 can be handled with a Tumbling window, and a ProcessWindowFunction. When you override the "process" method, you'll have Iterable list of events that happened in that window. There you can check your business condition specified in step 4. > > The business logic in point 4 is taking under the consideration data from > previously buffered messages in this time buffer session. > > My setup for this is > 1.keyedStream with ProcessingTimeSessionWindow (I dont need EventTime for > this). > 2. Custom Trigger > > The custom trigger: > 1. keeps some data in its state under AggregatingStateDescriptor allowing me > to override "merge" method from Trigger class. > > 2. In onElement method, for the first call I execute > ctx.registerEventTimeTimer(window.maxTimestamp()); > Additionally in this method I added the busioenss logic which returns > TriggerResult.FIRE or TriggerResult.CONTINUE > > 3. The onProcessingTime methods returns TriggerResult.FIRE > > 3. all other methods are returning TriggerResult.CONTINUE > > > > As a result, I can observe that my window is fired two times. One from > onElement method where the busienss condition is meet and second time from > onProcessingTime method. > > What is the best way to prevent this? > > Regards, > Krzysztof > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Marco Villalobos-2
unfortunately I don't think Tumbling window will work in my case. The reasons: 1. Window must start only when there is a new event, and previous window is closed. The new Tumbling window is created just after previews one is purged. In my case I have to use SessionWindow where Session windows do not overlap and do not have a fixed start and end time, in contrast to tumbling windows and sliding windows [1]. 2. My logic required to close the window earlier, before Window maxTime hence custom trigger. The issue I'm having though is that My Trigger firres the windwo two times. First time from onElement method and secodn time from onProcessignTime method. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
It seems that I'm clearing the timers in a right way, but there is a new
timer created from WindowOperator::registerCleanupTimer method. This one is called from WindowOperator::processElement at the end of both if/else branches. How can I mitigate this? I dont want to have any "late firings" for my windows. The windwo should be fully closed after Trigger::onElement or Trigger::onProcessingTimne method if there was no onElement Fire result created. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I think I've figured it out.
I switched to GlobalWidnow with my custom trigger. My Trigger combines processingTime trigger logic and onElement trigger logic. Only one should be executed in scope of particular window. I managed to do this by returning FIRE_AND_PURGE and cleat all timers and state whenever I'm closing the window. In my case I don't have "late events" that should be added into previously ended window so it simplifies the job. Thanks :) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |