Hello All, I am getting a lot of user events in a stream. There are different types of events, now I want to build some aggregation metrics for the user by grouping events in buckets. My condition for windowing is : 1. Start the window for the user when event_name: "search" arrived for the user. 2. Trigger the window when either 30 mins from the start of the window reached OR event_type : "start" is appeared. After that, I want to do calculate some aggregation on those window events. I know this can be done using process function but I am stuck to create the window with multiple conditions trigger. Please help me how to create this type of window with multiple trigger condition either time or some event happen. |
Session window defined on the gap of inactivity, I do not have that requirement. Start the window only on the "search event" that part I will take later. Let's say in the first phase I want to start the window on any event that appears for that user. For example : Scenario -1 t1 ----- user1 ---- event1 ( window start) t1 +5 mins ----- user1 ----- event2 t1 + 10 mins --- user1 ---- event3 t1 + 15 mins ----- user1 ---- event4===start type event (terminate window as event type "Start" arrived and calculate aggregate on above collected events) t1+16 mins ---user-1 ---- event 5 starts a new window Scenario -2 t1 ----- user1 ---- event1 ( window start) t1 +5 mins ----- user1 ----- event2 t1 + 10 mins --- user1 ---- event3 t1 + 30 mins ----- user1 ---- event4 (terminates the window as 30 mins elapsed and calculate aggregate on above collected events) t1+31 mins ---user-1 ---- event5 starts a new window This I want to implement. I have tried to read triggers but did not getting understand how to trigger when either time pass or eventtype== "start" has arrived. Which function of trigger class I have to implement and how to check these 2 conditions on each event arrive. Please help to implement this. If you can provide a basic start function that I need to implement. I am not clear how to start. On Thu, May 21, 2020 at 4:59 PM Jiayi Liao <[hidden email]> wrote:
|
Hi,
To achieve what you have in mind, I think what you have to do is to use a processing time window of 30 mins, and have a custom trigger that matches the "start" event in the `onElement` method and return TriggerResult.FIRE_AND_PURGE. That way, the window fires either when the processing time has passed, or the start event was recieved. Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I was also thinking to have a processing time window but that will not work for me. I want to start the window when the user "search" event arrives. So for each user window will start from the search event. The Tumbling window has fixed start end time so that will not be suitable in my case. On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote: Hi, |
Another option I am thinking is I just use a Richflatmap function and use the keyed state to build this logic. Is that the correct approach? On Fri, May 22, 2020 at 4:52 PM aj <[hidden email]> wrote:
|
Hi, First sorry that I'm not expert on Window and please correct me if I'm wrong, but from my side, it seems the assigner might also be a problem in addition to the trigger: currently Flink window assigner should be all based on time (processing time or event time), and it might be hard to implement an event-driven window assigner that start to assign elements to a window after received some elements. What comes to me is that a possible alternative method is to use the low-level KeyedProcessFunction directly: you may register a timer 30 mins later when received the "search" event and write the time of search event into the state. Then for the following events, they will be saved to the state since the flag is set. After received the "start" event or the timer is triggered, you could load all the events from the states, do the aggregation and cancel the timer if it is triggered by "start" event. A simpler case is [1] and it does not consider stop the aggreation when received special event, but it seems that the logic could be added to the case. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example Best, Yun
|
Hi, I have implemented the below solution and its working fine but the biggest problem with this is if no event coming for the user after 30 min then I am not able to trigger because I am checking time diff from upcoming events. So when the next event comes than only it triggers but I want it to trigger just after 30 mins. So please help me to improve this and how to solve the above problem.
On Sun, May 24, 2020 at 10:57 PM Yun Gao <[hidden email]> wrote:
|
Hi, I think you could use timer to achieve that. In processFunction you could register a timer at specific time (event time or processing time) and get callbacked at that point. It could be registered like
More details on timer could be found in [1] and an example is in [2]. In this example, a timer is registered in the last line of the processElement method, and the callback is implemented by override the onTimer method. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#example
|
Thanks Yun. I have converted the code to use a keyed-processed function rather than a flatMap and using register timer it worked. On Fri, May 29, 2020 at 11:13 AM Yun Gao <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |