UDTAGG and SQL

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

UDTAGG and SQL

Marco Villalobos-2
 I am trying to use User defined Table Aggregate function directly in the SQL so that I could combine all the rows collected in a window into one object.

GIVEN a User defined Table Aggregate function

public class MyUDTAGG extends TableAggregateFunction<PurchaseWindow,PurchaseWindow> {

        public PurchaseWindow createAccumulator() {
                return new PurchaseWindow();
        }

        public void accumulate(PurchaseWindow acc, String name, double cost) {
                acc.add(name, cost);
        }

        public void emitValue(PurchaseWindow acc, Collector<PurchaseWindow> out) {
                out.collect(acc);
        }
}

THAT it is registered as

StreamTableEnvironment tEnv = ...
tEnv.registerFunction("MyUDTAGG", new MyUDTAGG());

THEN is it possible to call it in an SQL query in this manner?

SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window
FROM purchases
GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name


I am receiving an SQL validation error, "No match found for function signature ...".

What am I doing wrong, or is there a better way to do this?



Reply | Threaded
Open this post in threaded view
|

Re: UDTAGG and SQL

Timo Walther
Hi Marco,

nesting aggregated functions is not allowed in SQL. The exception could
be improved though. I guess the planner searches for a scalar function
called `MyUDTAGG` in your example and cannot find one.

Maybe the built-in function `COLLECT` or `LISTAGG`is what you are
looking for?

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html

Regards,
Timo


On 05.01.21 14:45, Marco Villalobos wrote:

>   I am trying to use User defined Table Aggregate function directly in the SQL so that I could combine all the rows collected in a window into one object.
>
> GIVEN a User defined Table Aggregate function
>
> public class MyUDTAGG extends TableAggregateFunction<PurchaseWindow,PurchaseWindow> {
>
> public PurchaseWindow createAccumulator() {
> return new PurchaseWindow();
> }
>
> public void accumulate(PurchaseWindow acc, String name, double cost) {
> acc.add(name, cost);
> }
>
> public void emitValue(PurchaseWindow acc, Collector<PurchaseWindow> out) {
> out.collect(acc);
> }
> }
>
> THAT it is registered as
>
> StreamTableEnvironment tEnv = ...
> tEnv.registerFunction("MyUDTAGG", new MyUDTAGG());
>
> THEN is it possible to call it in an SQL query in this manner?
>
> SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window
> FROM purchases
> GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name
>
>
> I am receiving an SQL validation error, "No match found for function signature ...".
>
> What am I doing wrong, or is there a better way to do this?
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: UDTAGG and SQL

Marco Villalobos-2
Hi Timo,

Thank you for the quick response.

Neither COLLECT nor LISTAGG work because they only accept one column.

I am trying to collect all the rows and columns into one object. Like a List<Row> for example.
Later, I need make calculations upon all the rows that were just collected within a window.

Maybe I need to use a subquery, ie,  SELECT FROM (SELECT FROM)?

> On Jan 5, 2021, at 6:10 AM, Timo Walther <[hidden email]> wrote:
>
> Hi Marco,
>
> nesting aggregated functions is not allowed in SQL. The exception could be improved though. I guess the planner searches for a scalar function called `MyUDTAGG` in your example and cannot find one.
>
> Maybe the built-in function `COLLECT` or `LISTAGG`is what you are looking for?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html
>
> Regards,
> Timo
>
>
> On 05.01.21 14:45, Marco Villalobos wrote:
>>  I am trying to use User defined Table Aggregate function directly in the SQL so that I could combine all the rows collected in a window into one object.
>> GIVEN a User defined Table Aggregate function
>> public class MyUDTAGG extends TableAggregateFunction<PurchaseWindow,PurchaseWindow> {
>> public PurchaseWindow createAccumulator() {
>> return new PurchaseWindow();
>> }
>> public void accumulate(PurchaseWindow acc, String name, double cost) {
>> acc.add(name, cost);
>> }
>> public void emitValue(PurchaseWindow acc, Collector<PurchaseWindow> out) {
>> out.collect(acc);
>> }
>> }
>> THAT it is registered as
>> StreamTableEnvironment tEnv = ...
>> tEnv.registerFunction("MyUDTAGG", new MyUDTAGG());
>> THEN is it possible to call it in an SQL query in this manner?
>> SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window
>> FROM purchases
>> GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name
>> I am receiving an SQL validation error, "No match found for function signature ...".
>> What am I doing wrong, or is there a better way to do this?
>

Reply | Threaded
Open this post in threaded view
|

Re: UDTAGG and SQL

Timo Walther
A subquery could work but since you want to implement a UDTAGG anyway,
you can also move the implementation there and keep the SQL query
simple. But this is up to you. Consecutive windows are supported.

Regards,
Timo


On 05.01.21 15:23, Marco Villalobos wrote:

> Hi Timo,
>
> Thank you for the quick response.
>
> Neither COLLECT nor LISTAGG work because they only accept one column.
>
> I am trying to collect all the rows and columns into one object. Like a List<Row> for example.
> Later, I need make calculations upon all the rows that were just collected within a window.
>
> Maybe I need to use a subquery, ie,  SELECT FROM (SELECT FROM)?
>
>> On Jan 5, 2021, at 6:10 AM, Timo Walther <[hidden email]> wrote:
>>
>> Hi Marco,
>>
>> nesting aggregated functions is not allowed in SQL. The exception could be improved though. I guess the planner searches for a scalar function called `MyUDTAGG` in your example and cannot find one.
>>
>> Maybe the built-in function `COLLECT` or `LISTAGG`is what you are looking for?
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html
>>
>> Regards,
>> Timo
>>
>>
>> On 05.01.21 14:45, Marco Villalobos wrote:
>>>   I am trying to use User defined Table Aggregate function directly in the SQL so that I could combine all the rows collected in a window into one object.
>>> GIVEN a User defined Table Aggregate function
>>> public class MyUDTAGG extends TableAggregateFunction<PurchaseWindow,PurchaseWindow> {
>>> public PurchaseWindow createAccumulator() {
>>> return new PurchaseWindow();
>>> }
>>> public void accumulate(PurchaseWindow acc, String name, double cost) {
>>> acc.add(name, cost);
>>> }
>>> public void emitValue(PurchaseWindow acc, Collector<PurchaseWindow> out) {
>>> out.collect(acc);
>>> }
>>> }
>>> THAT it is registered as
>>> StreamTableEnvironment tEnv = ...
>>> tEnv.registerFunction("MyUDTAGG", new MyUDTAGG());
>>> THEN is it possible to call it in an SQL query in this manner?
>>> SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window
>>> FROM purchases
>>> GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name
>>> I am receiving an SQL validation error, "No match found for function signature ...".
>>> What am I doing wrong, or is there a better way to do this?
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: UDTAGG and SQL

Marco Villalobos-2
Hi Timo,

Can you please elaborate a bit on what you mean? I am not sure that I completely understand.

Thank you.

Sincerely,

Marco A. Villalobos

On Tue, Jan 5, 2021 at 6:58 AM Timo Walther <[hidden email]> wrote:
A subquery could work but since you want to implement a UDTAGG anyway,
you can also move the implementation there and keep the SQL query
simple. But this is up to you. Consecutive windows are supported.

Regards,
Timo


On 05.01.21 15:23, Marco Villalobos wrote:
> Hi Timo,
>
> Thank you for the quick response.
>
> Neither COLLECT nor LISTAGG work because they only accept one column.
>
> I am trying to collect all the rows and columns into one object. Like a List<Row> for example.
> Later, I need make calculations upon all the rows that were just collected within a window.
>
> Maybe I need to use a subquery, ie,  SELECT FROM (SELECT FROM)?
>
>> On Jan 5, 2021, at 6:10 AM, Timo Walther <[hidden email]> wrote:
>>
>> Hi Marco,
>>
>> nesting aggregated functions is not allowed in SQL. The exception could be improved though. I guess the planner searches for a scalar function called `MyUDTAGG` in your example and cannot find one.
>>
>> Maybe the built-in function `COLLECT` or `LISTAGG`is what you are looking for?
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html
>>
>> Regards,
>> Timo
>>
>>
>> On 05.01.21 14:45, Marco Villalobos wrote:
>>>   I am trying to use User defined Table Aggregate function directly in the SQL so that I could combine all the rows collected in a window into one object.
>>> GIVEN a User defined Table Aggregate function
>>> public class MyUDTAGG extends TableAggregateFunction<PurchaseWindow,PurchaseWindow> {
>>>     public PurchaseWindow createAccumulator() {
>>>             return new PurchaseWindow();
>>>     }
>>>     public void accumulate(PurchaseWindow acc, String name, double cost) {
>>>             acc.add(name, cost);
>>>     }
>>>     public void emitValue(PurchaseWindow acc, Collector<PurchaseWindow> out) {
>>>             out.collect(acc);
>>>     }
>>> }
>>> THAT it is registered as
>>> StreamTableEnvironment tEnv = ...
>>> tEnv.registerFunction("MyUDTAGG", new MyUDTAGG());
>>> THEN is it possible to call it in an SQL query in this manner?
>>> SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window
>>> FROM purchases
>>> GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name
>>> I am receiving an SQL validation error, "No match found for function signature ...".
>>> What am I doing wrong, or is there a better way to do this?
>>
>