Hi All, Could you please tell how to register custom Aggregation function in blink batch app? In case of streaming mode: We create EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); Which has: <T,
ACC>
void
registerFunction(String name,
AggregateFunction<T,
ACC> aggregateFunction); But in case of batchMode, we need to create TableEnvironment: EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); Which does not have this function to register AggregationFunction, only Scalar one. Details: Flink 1.10, Java API
|
`StreamTableEnvironment.create()` yields a `StreamTableEnvironmentImpl` object, which has several `registerFunction` interface for ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction. `TableEnvironment.create()` yields a `TableEnvironmentImpl` object, which is a unify entry point for Table/SQL programs. And it only has a deprecated `registerFunction` interface for ScalarFunction. You should use `createTemporarySystemFunction` instead. A workaround for batch mode of blink planner is: You can use the public constructor of `StreamTableEnvironmentImpl` to create the TableEnvironment and use `registerFunction`s. Pls make sure you pass in the correct `isStreamingMode = false` Best Regards, Zhenghua Gao On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan <[hidden email]> wrote:
|
Hi Dmytro, Currently, TableEnvironment does not support register AggregationFunction and TableFunction, because type extractor has not been unified for Java and Scala. One approach is we can use "TableEnvironment#createFunction" which will register UDF to catalog. I find "createTemporarySystemFunction" does not work now. cc [hidden email] Best, Godfrey Zhenghua Gao <[hidden email]> 于2020年4月14日周二 下午6:40写道:
|
Hi Dmytro,
table function will be supported in Flink 1.11 with the new type system. Hopefully, we can also support aggregate functions until then. Regards, Timo On 14.04.20 15:33, godfrey he wrote: > Hi Dmytro, > > Currently, TableEnvironment does not support > register AggregationFunction and TableFunction, because type extractor > has not been unified for Java and Scala. > > One approach is we can use "TableEnvironment#createFunction" which will > register UDF to catalog. > I find "createTemporarySystemFunction" does not work now. cc @Zhenghua > Gao <mailto:[hidden email]> > > Best, > Godfrey > > Zhenghua Gao <[hidden email] <mailto:[hidden email]>> 于2020年4月14 > 日周二 下午6:40写道: > > `StreamTableEnvironment.create()` yields a > `StreamTableEnvironmentImpl` object, > which has several `registerFunction` interface for > ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction. > > `TableEnvironment.create()` yields a `TableEnvironmentImpl` object, > which is a unify entry point for Table/SQL programs. > And it only has a deprecated `registerFunction` interface for > ScalarFunction. You should use `createTemporarySystemFunction` instead. > > A workaround for batch mode of blink planner is: You can use the > public constructor of `StreamTableEnvironmentImpl` to create > the TableEnvironment and use `registerFunction`s. Pls make sure you > pass in the correct `isStreamingMode = false` > > *Best Regards,* > *Zhenghua Gao* > > > On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan > <[hidden email] <mailto:[hidden email]>> wrote: > > Hi All,____ > > __ __ > > Could you please tell how to register custom Aggregation > function in blink batch app?____ > > In case of streaming mode:____ > > We create ____ > > EnvironmentSettings bsSettings = > EnvironmentSettings./newInstance/().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment./create/(env, bsSettings);____ > > __ __ > > Which has:____ > > <T, ACC> void registerFunction(String name, AggregateFunction<T, > ACC> aggregateFunction);____ > > __ __ > > But in case of batchMode, we need to create TableEnvironment:____ > > __ __ > > EnvironmentSettings bsSettings = > EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build(); > tEnv = TableEnvironment./create/(bsSettings);____ > > __ __ > > Which does not have this function to register > AggregationFunction, only Scalar one.____ > > __ __ > > Details: Flink 1.10, Java API ____ > > __ __ > > __ __ > |
Hi Dmytro, For 1.11: Like Godfrey said, you can use "TableEnvironment#createFunction/createTemporarySystemFunction". And like Timo said, can support function with new type system. But for 1.10 and 1.9: A workaround way is: "tEnv.getCatalog(tEnv.getCurrentCatalog()).get().createFunction" You may need understand some catalog concept. Best, Jingsong Lee On Wed, Apr 15, 2020 at 2:46 PM Timo Walther <[hidden email]> wrote: Hi Dmytro, Best, Jingsong Lee |
Free forum by Nabble | Edit this page |