Registering UDAF in blink batch app

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Registering UDAF in blink batch app

Dmytro Dragan

Hi All,

 

Could you please tell how to register custom Aggregation function in blink batch app?

In case of streaming mode:

We create

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

 

Which has:

<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);

 

But in case of batchMode, we need to create TableEnvironment:

 

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
tEnv = TableEnvironment.create(bsSettings);

 

Which does not have this function to register AggregationFunction, only Scalar one.

 

Details: Flink 1.10, Java API

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Registering UDAF in blink batch app

Zhenghua Gao
`StreamTableEnvironment.create()` yields a `StreamTableEnvironmentImpl` object, 
which has several `registerFunction` interface for ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction.

`TableEnvironment.create()` yields a `TableEnvironmentImpl` object, which is a unify entry point for Table/SQL programs.
And it only has a deprecated `registerFunction` interface for ScalarFunction.  You should use `createTemporarySystemFunction` instead.

A workaround for batch mode of blink planner is: You can use the public constructor of `StreamTableEnvironmentImpl` to create 
the TableEnvironment and use `registerFunction`s. Pls make sure you pass in the correct `isStreamingMode = false`

Best Regards,
Zhenghua Gao


On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan <[hidden email]> wrote:

Hi All,

 

Could you please tell how to register custom Aggregation function in blink batch app?

In case of streaming mode:

We create

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

 

Which has:

<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);

 

But in case of batchMode, we need to create TableEnvironment:

 

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
tEnv = TableEnvironment.create(bsSettings);

 

Which does not have this function to register AggregationFunction, only Scalar one.

 

Details: Flink 1.10, Java API

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Registering UDAF in blink batch app

godfrey he
Hi Dmytro,

Currently, TableEnvironment does not support register AggregationFunction and TableFunction, because type extractor has not been unified for Java and Scala.

One approach is we can use "TableEnvironment#createFunction" which will register UDF to catalog.
I find "createTemporarySystemFunction" does not work now. cc [hidden email] 

Best,
Godfrey

Zhenghua Gao <[hidden email]> 于2020年4月14日周二 下午6:40写道:
`StreamTableEnvironment.create()` yields a `StreamTableEnvironmentImpl` object, 
which has several `registerFunction` interface for ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction.

`TableEnvironment.create()` yields a `TableEnvironmentImpl` object, which is a unify entry point for Table/SQL programs.
And it only has a deprecated `registerFunction` interface for ScalarFunction.  You should use `createTemporarySystemFunction` instead.

A workaround for batch mode of blink planner is: You can use the public constructor of `StreamTableEnvironmentImpl` to create 
the TableEnvironment and use `registerFunction`s. Pls make sure you pass in the correct `isStreamingMode = false`

Best Regards,
Zhenghua Gao


On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan <[hidden email]> wrote:

Hi All,

 

Could you please tell how to register custom Aggregation function in blink batch app?

In case of streaming mode:

We create

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

 

Which has:

<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);

 

But in case of batchMode, we need to create TableEnvironment:

 

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
tEnv = TableEnvironment.create(bsSettings);

 

Which does not have this function to register AggregationFunction, only Scalar one.

 

Details: Flink 1.10, Java API

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Registering UDAF in blink batch app

Timo Walther
Hi Dmytro,

table function will be supported in Flink 1.11 with the new type system.
Hopefully, we can also support aggregate functions until then.

Regards,
Timo

On 14.04.20 15:33, godfrey he wrote:

> Hi Dmytro,
>
> Currently, TableEnvironment does not support
> register AggregationFunction and TableFunction, because type extractor
> has not been unified for Java and Scala.
>
> One approach is we can use "TableEnvironment#createFunction" which will
> register UDF to catalog.
> I find "createTemporarySystemFunction" does not work now. cc @Zhenghua
> Gao <mailto:[hidden email]>
>
> Best,
> Godfrey
>
> Zhenghua Gao <[hidden email] <mailto:[hidden email]>> 于2020年4月14
> 日周二 下午6:40写道:
>
>     `StreamTableEnvironment.create()` yields a
>     `StreamTableEnvironmentImpl` object,
>     which has several `registerFunction` interface for
>     ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction.
>
>     `TableEnvironment.create()` yields a `TableEnvironmentImpl` object,
>     which is a unify entry point for Table/SQL programs.
>     And it only has a deprecated `registerFunction` interface for
>     ScalarFunction.  You should use `createTemporarySystemFunction` instead.
>
>     A workaround for batch mode of blink planner is: You can use the
>     public constructor of `StreamTableEnvironmentImpl` to create
>     the TableEnvironment and use `registerFunction`s. Pls make sure you
>     pass in the correct `isStreamingMode = false`
>
>     *Best Regards,*
>     *Zhenghua Gao*
>
>
>     On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan
>     <[hidden email] <mailto:[hidden email]>> wrote:
>
>         Hi All,____
>
>         __ __
>
>         Could you please tell how to register custom Aggregation
>         function in blink batch app?____
>
>         In case of streaming mode:____
>
>         We create ____
>
>         EnvironmentSettings bsSettings =
>         EnvironmentSettings./newInstance/().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment tableEnv =
>         StreamTableEnvironment./create/(env, bsSettings);____
>
>         __ __
>
>         Which has:____
>
>         <T, ACC> void registerFunction(String name, AggregateFunction<T,
>         ACC> aggregateFunction);____
>
>         __ __
>
>         But in case of batchMode, we need to create TableEnvironment:____
>
>         __ __
>
>         EnvironmentSettings bsSettings =
>         EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
>         tEnv = TableEnvironment./create/(bsSettings);____
>
>         __ __
>
>         Which does not have this function to register
>         AggregationFunction, only Scalar one.____
>
>         __ __
>
>         Details: Flink 1.10, Java API ____
>
>         __ __
>
>         __ __
>

Reply | Threaded
Open this post in threaded view
|

Re: Registering UDAF in blink batch app

Jingsong Li
Hi Dmytro,

For 1.11:
Like Godfrey said, you can use "TableEnvironment#createFunction/createTemporarySystemFunction". And like Timo said, can support function with new type system.

But for 1.10 and 1.9:
A workaround way is:
"tEnv.getCatalog(tEnv.getCurrentCatalog()).get().createFunction"
You may need understand some catalog concept.

Best,
Jingsong Lee

On Wed, Apr 15, 2020 at 2:46 PM Timo Walther <[hidden email]> wrote:
Hi Dmytro,

table function will be supported in Flink 1.11 with the new type system.
Hopefully, we can also support aggregate functions until then.

Regards,
Timo

On 14.04.20 15:33, godfrey he wrote:
> Hi Dmytro,
>
> Currently, TableEnvironment does not support
> register AggregationFunction and TableFunction, because type extractor
> has not been unified for Java and Scala.
>
> One approach is we can use "TableEnvironment#createFunction" which will
> register UDF to catalog.
> I find "createTemporarySystemFunction" does not work now. cc @Zhenghua
> Gao <mailto:[hidden email]>
>
> Best,
> Godfrey
>
> Zhenghua Gao <[hidden email] <mailto:[hidden email]>> 于2020年4月14
> 日周二 下午6:40写道:
>
>     `StreamTableEnvironment.create()` yields a
>     `StreamTableEnvironmentImpl` object,
>     which has several `registerFunction` interface for
>     ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction.
>
>     `TableEnvironment.create()` yields a `TableEnvironmentImpl` object,
>     which is a unify entry point for Table/SQL programs.
>     And it only has a deprecated `registerFunction` interface for
>     ScalarFunction.  You should use `createTemporarySystemFunction` instead.
>
>     A workaround for batch mode of blink planner is: You can use the
>     public constructor of `StreamTableEnvironmentImpl` to create
>     the TableEnvironment and use `registerFunction`s. Pls make sure you
>     pass in the correct `isStreamingMode = false`
>
>     *Best Regards,*
>     *Zhenghua Gao*
>
>
>     On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan
>     <[hidden email] <mailto:[hidden email]>> wrote:
>
>         Hi All,____
>
>         __ __
>
>         Could you please tell how to register custom Aggregation
>         function in blink batch app?____
>
>         In case of streaming mode:____
>
>         We create ____
>
>         EnvironmentSettings bsSettings =
>         EnvironmentSettings./newInstance/().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment tableEnv =
>         StreamTableEnvironment./create/(env, bsSettings);____
>
>         __ __
>
>         Which has:____
>
>         <T, ACC> void registerFunction(String name, AggregateFunction<T,
>         ACC> aggregateFunction);____
>
>         __ __
>
>         But in case of batchMode, we need to create TableEnvironment:____
>
>         __ __
>
>         EnvironmentSettings bsSettings =
>         EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
>         tEnv = TableEnvironment./create/(bsSettings);____
>
>         __ __
>
>         Which does not have this function to register
>         AggregationFunction, only Scalar one.____
>
>         __ __
>
>         Details: Flink 1.10, Java API ____
>
>         __ __
>
>         __ __
>



--
Best, Jingsong Lee