I am trying to use User defined Table Aggregate function directly in the SQL so that I could combine all the rows collected in a window into one object.
GIVEN a User defined Table Aggregate function public class MyUDTAGG extends TableAggregateFunction<PurchaseWindow,PurchaseWindow> { public PurchaseWindow createAccumulator() { return new PurchaseWindow(); } public void accumulate(PurchaseWindow acc, String name, double cost) { acc.add(name, cost); } public void emitValue(PurchaseWindow acc, Collector<PurchaseWindow> out) { out.collect(acc); } } THAT it is registered as StreamTableEnvironment tEnv = ... tEnv.registerFunction("MyUDTAGG", new MyUDTAGG()); THEN is it possible to call it in an SQL query in this manner? SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window FROM purchases GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name I am receiving an SQL validation error, "No match found for function signature ...". What am I doing wrong, or is there a better way to do this? |
Hi Marco,
nesting aggregated functions is not allowed in SQL. The exception could be improved though. I guess the planner searches for a scalar function called `MyUDTAGG` in your example and cannot find one. Maybe the built-in function `COLLECT` or `LISTAGG`is what you are looking for? https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html Regards, Timo On 05.01.21 14:45, Marco Villalobos wrote: > I am trying to use User defined Table Aggregate function directly in the SQL so that I could combine all the rows collected in a window into one object. > > GIVEN a User defined Table Aggregate function > > public class MyUDTAGG extends TableAggregateFunction<PurchaseWindow,PurchaseWindow> { > > public PurchaseWindow createAccumulator() { > return new PurchaseWindow(); > } > > public void accumulate(PurchaseWindow acc, String name, double cost) { > acc.add(name, cost); > } > > public void emitValue(PurchaseWindow acc, Collector<PurchaseWindow> out) { > out.collect(acc); > } > } > > THAT it is registered as > > StreamTableEnvironment tEnv = ... > tEnv.registerFunction("MyUDTAGG", new MyUDTAGG()); > > THEN is it possible to call it in an SQL query in this manner? > > SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window > FROM purchases > GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name > > > I am receiving an SQL validation error, "No match found for function signature ...". > > What am I doing wrong, or is there a better way to do this? > > > |
Hi Timo,
Thank you for the quick response. Neither COLLECT nor LISTAGG work because they only accept one column. I am trying to collect all the rows and columns into one object. Like a List<Row> for example. Later, I need make calculations upon all the rows that were just collected within a window. Maybe I need to use a subquery, ie, SELECT FROM (SELECT FROM)? > On Jan 5, 2021, at 6:10 AM, Timo Walther <[hidden email]> wrote: > > Hi Marco, > > nesting aggregated functions is not allowed in SQL. The exception could be improved though. I guess the planner searches for a scalar function called `MyUDTAGG` in your example and cannot find one. > > Maybe the built-in function `COLLECT` or `LISTAGG`is what you are looking for? > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html > > Regards, > Timo > > > On 05.01.21 14:45, Marco Villalobos wrote: >> I am trying to use User defined Table Aggregate function directly in the SQL so that I could combine all the rows collected in a window into one object. >> GIVEN a User defined Table Aggregate function >> public class MyUDTAGG extends TableAggregateFunction<PurchaseWindow,PurchaseWindow> { >> public PurchaseWindow createAccumulator() { >> return new PurchaseWindow(); >> } >> public void accumulate(PurchaseWindow acc, String name, double cost) { >> acc.add(name, cost); >> } >> public void emitValue(PurchaseWindow acc, Collector<PurchaseWindow> out) { >> out.collect(acc); >> } >> } >> THAT it is registered as >> StreamTableEnvironment tEnv = ... >> tEnv.registerFunction("MyUDTAGG", new MyUDTAGG()); >> THEN is it possible to call it in an SQL query in this manner? >> SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window >> FROM purchases >> GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name >> I am receiving an SQL validation error, "No match found for function signature ...". >> What am I doing wrong, or is there a better way to do this? > |
A subquery could work but since you want to implement a UDTAGG anyway,
you can also move the implementation there and keep the SQL query simple. But this is up to you. Consecutive windows are supported. Regards, Timo On 05.01.21 15:23, Marco Villalobos wrote: > Hi Timo, > > Thank you for the quick response. > > Neither COLLECT nor LISTAGG work because they only accept one column. > > I am trying to collect all the rows and columns into one object. Like a List<Row> for example. > Later, I need make calculations upon all the rows that were just collected within a window. > > Maybe I need to use a subquery, ie, SELECT FROM (SELECT FROM)? > >> On Jan 5, 2021, at 6:10 AM, Timo Walther <[hidden email]> wrote: >> >> Hi Marco, >> >> nesting aggregated functions is not allowed in SQL. The exception could be improved though. I guess the planner searches for a scalar function called `MyUDTAGG` in your example and cannot find one. >> >> Maybe the built-in function `COLLECT` or `LISTAGG`is what you are looking for? >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html >> >> Regards, >> Timo >> >> >> On 05.01.21 14:45, Marco Villalobos wrote: >>> I am trying to use User defined Table Aggregate function directly in the SQL so that I could combine all the rows collected in a window into one object. >>> GIVEN a User defined Table Aggregate function >>> public class MyUDTAGG extends TableAggregateFunction<PurchaseWindow,PurchaseWindow> { >>> public PurchaseWindow createAccumulator() { >>> return new PurchaseWindow(); >>> } >>> public void accumulate(PurchaseWindow acc, String name, double cost) { >>> acc.add(name, cost); >>> } >>> public void emitValue(PurchaseWindow acc, Collector<PurchaseWindow> out) { >>> out.collect(acc); >>> } >>> } >>> THAT it is registered as >>> StreamTableEnvironment tEnv = ... >>> tEnv.registerFunction("MyUDTAGG", new MyUDTAGG()); >>> THEN is it possible to call it in an SQL query in this manner? >>> SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window >>> FROM purchases >>> GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name >>> I am receiving an SQL validation error, "No match found for function signature ...". >>> What am I doing wrong, or is there a better way to do this? >> > |
Hi Timo, Can you please elaborate a bit on what you mean? I am not sure that I completely understand. Thank you. Sincerely, Marco A. Villalobos On Tue, Jan 5, 2021 at 6:58 AM Timo Walther <[hidden email]> wrote: A subquery could work but since you want to implement a UDTAGG anyway, |
Free forum by Nabble | Edit this page |