Custom Trigger + SQL Pattern

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom Trigger + SQL Pattern

shkob1
Hey!

I have a use case in which im grouping a stream by session id - so far
pretty standard, note that i need to do it through SQL and not by the table
api.
In my use case i have 2 trigger conditions though - while one is time
(session inactivity) the other is based on a specific event marked as a
"last" event.
AFAIK SQL does not support custom triggers - so what i end up doing is doing
group by in the SQL - then converting the result to a stream along with a
boolean field that marks whether at least one of the events was the end
event - then adding my custom trigger on top of it.
It looks something like this:

 Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
sessionId, count(*) FROM source Group By sessionId");
tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
                .filter(tuple -> tuple.f0)
                .map(...)
                .returns(...)
                .keyBy("sessionId")
                .window(EventTimeSessionWindows.withGap(Time.hours(4)))
                .trigger(new SessionEndedByTimeOrEndTrigger())
                .process(...take last element from the group by result..)

This seems like a weird work around to, isn't it? my window is basically of
the SQL result rather than on the source stream. Ideally i would keyby the
sessionId before running the SQL but then a) would I need to register a
table per key? b) would i be able to use the custom trigger per window?

basically i want to group by session id and have a window for every session
that supports both time and custom trigger. Assuming i need to use SQL
(reason is the query is dynamically loaded), is there a better solution for
it?










--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Custom Trigger + SQL Pattern

Hequn Cheng
Hi shkob1,

> while one is time(session inactivity) the other is based on a specific event marked as a "last" event.
How about using a session window and an udtf[1] to solve the problem. The session window may output multi `last` elements. However, we can use a udtf to split them into single ones. Thus, we can use SQL for the whole job.

Best, Hequn.


On Sat, Oct 13, 2018 at 2:28 AM shkob1 <[hidden email]> wrote:
Hey!

I have a use case in which im grouping a stream by session id - so far
pretty standard, note that i need to do it through SQL and not by the table
api.
In my use case i have 2 trigger conditions though - while one is time
(session inactivity) the other is based on a specific event marked as a
"last" event.
AFAIK SQL does not support custom triggers - so what i end up doing is doing
group by in the SQL - then converting the result to a stream along with a
boolean field that marks whether at least one of the events was the end
event - then adding my custom trigger on top of it.
It looks something like this:

 Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
sessionId, count(*) FROM source Group By sessionId");
tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
                .filter(tuple -> tuple.f0)
                .map(...)
                .returns(...)
                .keyBy("sessionId")
                .window(EventTimeSessionWindows.withGap(Time.hours(4)))
                .trigger(new SessionEndedByTimeOrEndTrigger())
                .process(...take last element from the group by result..)

This seems like a weird work around to, isn't it? my window is basically of
the SQL result rather than on the source stream. Ideally i would keyby the
sessionId before running the SQL but then a) would I need to register a
table per key? b) would i be able to use the custom trigger per window?

basically i want to group by session id and have a window for every session
that supports both time and custom trigger. Assuming i need to use SQL
(reason is the query is dynamically loaded), is there a better solution for
it?










--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Custom Trigger + SQL Pattern

shkob1
Im wondering how does that work, it seems that a table function still takes a single row's values as an input, am i wrong (or at least that is how the examples show)?
How would the SQL look like? 

On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng <[hidden email]> wrote:
Hi shkob1,

> while one is time(session inactivity) the other is based on a specific event marked as a "last" event.
How about using a session window and an udtf[1] to solve the problem. The session window may output multi `last` elements. However, we can use a udtf to split them into single ones. Thus, we can use SQL for the whole job.

Best, Hequn.


On Sat, Oct 13, 2018 at 2:28 AM shkob1 <[hidden email]> wrote:
Hey!

I have a use case in which im grouping a stream by session id - so far
pretty standard, note that i need to do it through SQL and not by the table
api.
In my use case i have 2 trigger conditions though - while one is time
(session inactivity) the other is based on a specific event marked as a
"last" event.
AFAIK SQL does not support custom triggers - so what i end up doing is doing
group by in the SQL - then converting the result to a stream along with a
boolean field that marks whether at least one of the events was the end
event - then adding my custom trigger on top of it.
It looks something like this:

 Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
sessionId, count(*) FROM source Group By sessionId");
tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
                .filter(tuple -> tuple.f0)
                .map(...)
                .returns(...)
                .keyBy("sessionId")
                .window(EventTimeSessionWindows.withGap(Time.hours(4)))
                .trigger(new SessionEndedByTimeOrEndTrigger())
                .process(...take last element from the group by result..)

This seems like a weird work around to, isn't it? my window is basically of
the SQL result rather than on the source stream. Ideally i would keyby the
sessionId before running the SQL but then a) would I need to register a
table per key? b) would i be able to use the custom trigger per window?

basically i want to group by session id and have a window for every session
that supports both time and custom trigger. Assuming i need to use SQL
(reason is the query is dynamically loaded), is there a better solution for
it?










--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Custom Trigger + SQL Pattern

Hequn Cheng
Hi Shahar,

The table function takes a single row but can output multi rows. You can split the row based on the "last" event. The code looks like:

    val sessionResult =
      "SELECT " +
        "  lastUDAF(line) AS lastEvents "
        "FROM MyTable " +
        "GROUP BY SESSION(rowtime, INTERVAL '4' HOUR)"
    val result =
      s"SELECT lastEvent FROM ($sessionResult), LATERAL TABLE(splitUDTF(lastEvents)) as T(lastEvent)"

The lastUDAF is used to process data in a session window. As your lastEvent is base on either window end or a special "last" event, the lastUDAF outputs multi last events.
After the window, we perform a splitUDTF to split the lastEvents to multi single events. 

Best, Hequn


On Wed, Oct 17, 2018 at 12:38 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Im wondering how does that work, it seems that a table function still takes a single row's values as an input, am i wrong (or at least that is how the examples show)?
How would the SQL look like? 

On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng <[hidden email]> wrote:
Hi shkob1,

> while one is time(session inactivity) the other is based on a specific event marked as a "last" event.
How about using a session window and an udtf[1] to solve the problem. The session window may output multi `last` elements. However, we can use a udtf to split them into single ones. Thus, we can use SQL for the whole job.

Best, Hequn.


On Sat, Oct 13, 2018 at 2:28 AM shkob1 <[hidden email]> wrote:
Hey!

I have a use case in which im grouping a stream by session id - so far
pretty standard, note that i need to do it through SQL and not by the table
api.
In my use case i have 2 trigger conditions though - while one is time
(session inactivity) the other is based on a specific event marked as a
"last" event.
AFAIK SQL does not support custom triggers - so what i end up doing is doing
group by in the SQL - then converting the result to a stream along with a
boolean field that marks whether at least one of the events was the end
event - then adding my custom trigger on top of it.
It looks something like this:

 Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
sessionId, count(*) FROM source Group By sessionId");
tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
                .filter(tuple -> tuple.f0)
                .map(...)
                .returns(...)
                .keyBy("sessionId")
                .window(EventTimeSessionWindows.withGap(Time.hours(4)))
                .trigger(new SessionEndedByTimeOrEndTrigger())
                .process(...take last element from the group by result..)

This seems like a weird work around to, isn't it? my window is basically of
the SQL result rather than on the source stream. Ideally i would keyby the
sessionId before running the SQL but then a) would I need to register a
table per key? b) would i be able to use the custom trigger per window?

basically i want to group by session id and have a window for every session
that supports both time and custom trigger. Assuming i need to use SQL
(reason is the query is dynamically loaded), is there a better solution for
it?










--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Custom Trigger + SQL Pattern

shkob1
Thanks for the answer Hequn!

To be honest im still trying to wrap my head around this solution, also trying to think whether it has advantages over my solution.
My original thought was that my design is "backwards" because logically i would want to 
  1. collect raw records
  2. partition them by session id to windows
  3. wait for the session to end (=accumulate)
  4. run the group by & aggregation on the ended session's row - emit the result
my solution though (and I assume yours too) is rather doing the aggregation for every record coming in, which seems wasteful. It doesn't have any benefit of storage on state as it's emitting to a retractable stream anyway  
What do you think?

Shahar


On Tue, Oct 16, 2018 at 7:02 PM Hequn Cheng <[hidden email]> wrote:
Hi Shahar,

The table function takes a single row but can output multi rows. You can split the row based on the "last" event. The code looks like:

    val sessionResult =
      "SELECT " +
        "  lastUDAF(line) AS lastEvents "
        "FROM MyTable " +
        "GROUP BY SESSION(rowtime, INTERVAL '4' HOUR)"
    val result =
      s"SELECT lastEvent FROM ($sessionResult), LATERAL TABLE(splitUDTF(lastEvents)) as T(lastEvent)"

The lastUDAF is used to process data in a session window. As your lastEvent is base on either window end or a special "last" event, the lastUDAF outputs multi last events.
After the window, we perform a splitUDTF to split the lastEvents to multi single events. 

Best, Hequn


On Wed, Oct 17, 2018 at 12:38 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Im wondering how does that work, it seems that a table function still takes a single row's values as an input, am i wrong (or at least that is how the examples show)?
How would the SQL look like? 

On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng <[hidden email]> wrote:
Hi shkob1,

> while one is time(session inactivity) the other is based on a specific event marked as a "last" event.
How about using a session window and an udtf[1] to solve the problem. The session window may output multi `last` elements. However, we can use a udtf to split them into single ones. Thus, we can use SQL for the whole job.

Best, Hequn.


On Sat, Oct 13, 2018 at 2:28 AM shkob1 <[hidden email]> wrote:
Hey!

I have a use case in which im grouping a stream by session id - so far
pretty standard, note that i need to do it through SQL and not by the table
api.
In my use case i have 2 trigger conditions though - while one is time
(session inactivity) the other is based on a specific event marked as a
"last" event.
AFAIK SQL does not support custom triggers - so what i end up doing is doing
group by in the SQL - then converting the result to a stream along with a
boolean field that marks whether at least one of the events was the end
event - then adding my custom trigger on top of it.
It looks something like this:

 Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
sessionId, count(*) FROM source Group By sessionId");
tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
                .filter(tuple -> tuple.f0)
                .map(...)
                .returns(...)
                .keyBy("sessionId")
                .window(EventTimeSessionWindows.withGap(Time.hours(4)))
                .trigger(new SessionEndedByTimeOrEndTrigger())
                .process(...take last element from the group by result..)

This seems like a weird work around to, isn't it? my window is basically of
the SQL result rather than on the source stream. Ideally i would keyby the
sessionId before running the SQL but then a) would I need to register a
table per key? b) would i be able to use the custom trigger per window?

basically i want to group by session id and have a window for every session
that supports both time and custom trigger. Assuming i need to use SQL
(reason is the query is dynamically loaded), is there a better solution for
it?










--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Custom Trigger + SQL Pattern

shkob1
following up on the actual question - is there a way to register a
keyedstream as table(s) and have a trigger per key?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/