Hello, I'm trying to use a custom trigger for one of my use case. I have a basic logic (as shown below) of using keyBy on the input stream and using a window of 1 min. .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .trigger(new CustomTrigger()) .aggregate(Input.getAggregationFunction(), new AggregationProcessingWindow()); My custom trigger is expected to fire the first event of the keyBy instantly and any subsequent events should be aggregated in the window. .trigger(new Trigger<Record, TimeWindow>() { Currently, I see (for each window and same key) the first event of the window is always fired. But I want to see this happening for only the first window and for the subsequent window it should aggregate all the events and then fire. Example : all the records have the same key. current output. record 1 : first event in the window-1 : fired
record 2 : last event in the window-1 : fired
record 3 : first event in the window-2 : fired
record 4, record 5 : - 2 events in the window-2 : fired. expected output. record 1 : first event in the window-1 : fired
record 2 : last event in the window-1 : fired
record 3,4,5 : all event in the window-2 : fired
window-2 should not fire the first event of the same key. I'm reading it here https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge but not able to solve it. Any pointers would be helpful. Thanks. |
Hello, posting again for help. I'm planning to use state TTL but would like to know if there is any other way to do it. I'm using Flink 1.11. Thanks! ---------- Forwarded message --------- From: Diwakar Jha <[hidden email]> Date: Mon, Feb 22, 2021 at 6:28 PM Subject: Flink custom trigger use case To: user <[hidden email]> Hello, I'm trying to use a custom trigger for one of my use case. I have a basic logic (as shown below) of using keyBy on the input stream and using a window of 1 min. .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .trigger(new CustomTrigger()) .aggregate(Input.getAggregationFunction(), new AggregationProcessingWindow()); My custom trigger is expected to fire the first event of the keyBy instantly and any subsequent events should be aggregated in the window. .trigger(new Trigger<Record, TimeWindow>() { Currently, I see (for each window and same key) the first event of the window is always fired. But I want to see this happening for only the first window and for the subsequent window it should aggregate all the events and then fire. Example : all the records have the same key. current output. record 1 : first event in the window-1 : fired
record 2 : last event in the window-1 : fired
record 3 : first event in the window-2 : fired
record 4, record 5 : - 2 events in the window-2 : fired. expected output. record 1 : first event in the window-1 : fired
record 2 : last event in the window-1 : fired
record 3,4,5 : all event in the window-2 : fired
window-2 should not fire the first event of the same key. I'm reading it here https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge but not able to solve it. Any pointers would be helpful. Thanks. |
In reply to this post by Diwakar Jha
Hi, I've noticed that you are using an event time window, but the trigger fires based on processing time. You should also register an event time timer (for the window end). So that trigger.onEventTime() will be called. And it's safer to check if the state (firstSeen) value is true, not just exists. Regards,
Roman On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha <[hidden email]> wrote:
|
Hi Roman, Thanks for your reply! That was a typo, i'm using TumblingProcessingTimeWindows My problem is that i want to stop the first event trigger (per key) except for the first window. right now, my first event is getting triggered in every window. Will setting "state (firstSeen) value is true, not just exists" is also going to change the result per window. Thanks! On Tue, Feb 23, 2021 at 12:05 PM Roman Khachatryan <[hidden email]> wrote:
|
In reply to this post by Diwakar Jha
Hi Diwakar, I'm not sure I fully understand your question. If event handling in one window depends on some other windows than TriggerContext.getPartitionedState can not be used. Triggers don't have access to the global state (only to key-window scoped state). If that's what you want then please consider ProcessWindowFunction [1] where you can use context.globalState() in your process function. [1] Regards,
Roman On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha <[hidden email]> wrote:
|
Hi Diwakar, the issue is that you fire_and_purge the state, you should just FIRE on the first element (or else you lose the information that you received the element already). You'd use FIRE_AND_PURGE on the last element though. On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman <[hidden email]> wrote:
|
Hi Arvid, Thanks. I tried FIRE instead of FIRE_AND_PURGE and it introduced duplicates though the result is still the same i.e record 1 is fired both at the start and the end of the window. so for every window i see the first event of the window is coming twice in the output. I'm trying to explain again the desired behaviour, hopefully it becomes clear. all the records have the same key. current output. record 1 : first event in the window-1 : fired expected output. record 1 : first event in the window-1 : fired I think my problem is to store KeyBy values between windows. For example, I want to retain the KeyBy for 1 day. In that case, record 1 is fired instantly, all other records (of same key as record1) are always grouped in each window (say 1 min) instead of firing instantly. Thanks! On Wed, Feb 24, 2021 at 6:19 AM Arvid Heise <[hidden email]> wrote:
|
Hello, I tried using processWindowFunction since it gives access to globalstate through context. My question is, Is it possible to discard single events inside process function of processWindowFunction just like onElements of triggers? For my use case it seems that trigger is not sufficient but i want to know how i can do it using processWindowFunction. Appreciate any pointers. Thanks! On Wed, Feb 24, 2021 at 10:50 AM Diwakar Jha <[hidden email]> wrote:
|
Hi, Yes, you have an Iterable with window elements as the ProcessWindowFunction input. You can then emit them individually. Regards, Roman
On Thu, Feb 25, 2021 at 7:22 AM Diwakar Jha <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |