Hey!
I have a use case in which im grouping a stream by session id - so far pretty standard, note that i need to do it through SQL and not by the table api. In my use case i have 2 trigger conditions though - while one is time (session inactivity) the other is based on a specific event marked as a "last" event. AFAIK SQL does not support custom triggers - so what i end up doing is doing group by in the SQL - then converting the result to a stream along with a boolean field that marks whether at least one of the events was the end event - then adding my custom trigger on top of it. It looks something like this: Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent), sessionId, count(*) FROM source Group By sessionId"); tableEnv.toRetractStream(result, Row.class, streamQueryConfig) .filter(tuple -> tuple.f0) .map(...) .returns(...) .keyBy("sessionId") .window(EventTimeSessionWindows.withGap(Time.hours(4))) .trigger(new SessionEndedByTimeOrEndTrigger()) .process(...take last element from the group by result..) This seems like a weird work around to, isn't it? my window is basically of the SQL result rather than on the source stream. Ideally i would keyby the sessionId before running the SQL but then a) would I need to register a table per key? b) would i be able to use the custom trigger per window? basically i want to group by session id and have a window for every session that supports both time and custom trigger. Assuming i need to use SQL (reason is the query is dynamically loaded), is there a better solution for it? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi shkob1, > while one is time(session inactivity) the other is based on a specific event marked as a "last" event. How about using a session window and an udtf[1] to solve the problem. The session window may output multi `last` elements. However, we can use a udtf to split them into single ones. Thus, we can use SQL for the whole job. Best, Hequn. On Sat, Oct 13, 2018 at 2:28 AM shkob1 <[hidden email]> wrote: Hey! |
Im wondering how does that work, it seems that a table function still takes a single row's values as an input, am i wrong (or at least that is how the examples show)? How would the SQL look like? On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng <[hidden email]> wrote:
|
Hi Shahar, The table function takes a single row but can output multi rows. You can split the row based on the "last" event. The code looks like: val sessionResult = The lastUDAF is used to process data in a session window. As your lastEvent is base on either window end or a special "last" event, the lastUDAF outputs multi last events. After the window, we perform a splitUDTF to split the lastEvents to multi single events. Best, Hequn On Wed, Oct 17, 2018 at 12:38 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
|
Thanks for the answer Hequn! To be honest im still trying to wrap my head around this solution, also trying to think whether it has advantages over my solution. My original thought was that my design is "backwards" because logically i would want to
my solution though (and I assume yours too) is rather doing the aggregation for every record coming in, which seems wasteful. It doesn't have any benefit of storage on state as it's emitting to a retractable stream anyway What do you think? Shahar On Tue, Oct 16, 2018 at 7:02 PM Hequn Cheng <[hidden email]> wrote:
|
following up on the actual question - is there a way to register a
keyedstream as table(s) and have a trigger per key? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |