Hi Community,
I tried to write a UDF with generic type, but seems that Flink will complain not recognizing the type information when I use it in SQL. I checked the implementation of native function "MAX" and realize that it's not using the same API(AggregationFunction e.g.) as user defined function, is that the reason why "MAX" doesn't have the generic type issue? How can I implement my own "MAX" function which could support all types? Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I believe you have to extend
"org.apache.flink.table.functions.AggregateFunction" instead for it to work with SQL (or more generally speaking, any sub-class of "org.apache.flink.table.functions.UserDefinedFunction". On 04.01.2019 05:18, yinhua.dai wrote: > Hi Community, > > I tried to write a UDF with generic type, but seems that Flink will complain > not recognizing the type information when I use it in SQL. > > I checked the implementation of native function "MAX" and realize that it's > not using the same API(AggregationFunction e.g.) as user defined function, > is that the reason why "MAX" doesn't have the generic type issue? > > How can I implement my own "MAX" function which could support all types? > Thanks. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Hi Chesnay,
Maybe you misunderstand my question. I have below code: public class MyMaxAggregation<T> extends AggregateFunction<T, MyMaxAggregation.MyAccumulator> { @Override public MyAccumulator createAccumulator() { return new MyAccumulator(); } @Override public T getValue(MyAccumulator accumulator) { return null; } static class MyAccumulator { double maxValue; } } But tableEnv.registerFunction("MYMAX", new MyMaxAggregation<Integer>()); will throw exception as below: Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class com.tr.apt.test.MyMaxAggregation' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). Otherwise the type has to be specified explicitly using type information. at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:762) at org.apache.flink.table.api.java.StreamTableEnvironment.registerFunction(StreamTableEnvironment.scala:482) at com.tr.apt.test.StreamingJob.main(StreamingJob.java:52) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Yinhua,
Flink needs to know how to serialize and deserialize a type `T`. If you are using a type variable here, Flink can not derive the type information. You need to override org.apache.flink.table.functions.AggregateFunction#getResultType and return type information that matches. Regards, Timo Am 04.01.19 um 10:28 schrieb yinhua.dai: > Hi Chesnay, > > Maybe you misunderstand my question. > I have below code: > public class MyMaxAggregation<T> extends AggregateFunction<T, > MyMaxAggregation.MyAccumulator> { > @Override > public MyAccumulator createAccumulator() { > return new MyAccumulator(); > } > > @Override > public T getValue(MyAccumulator accumulator) { > return null; > } > > static class MyAccumulator { > double maxValue; > } > > } > > But tableEnv.registerFunction("MYMAX", new MyMaxAggregation<Integer>()); > will throw exception as below: > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: Type of > TypeVariable 'T' in 'class com.tr.apt.test.MyMaxAggregation' could not be > determined. This is most likely a type erasure problem. The type extraction > currently supports types with generic variables only in cases where all > variables in the return type can be deduced from the input type(s). > Otherwise the type has to be specified explicitly using type information. > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:762) > at > org.apache.flink.table.api.java.StreamTableEnvironment.registerFunction(StreamTableEnvironment.scala:482) > at com.tr.apt.test.StreamingJob.main(StreamingJob.java:52) > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Timo,
But getResultType should only return a concrete type information, right? How could I implement with a generic type? I'd like to clarify my questions again. Say I want to implement my own "MAX" function, but I want to apply it to different types, e.g. integer, long, double etc, so I tried to write a class which extends AggregateFunction *with generic type* to implement the max function. Then I want to register only one function name for all types. E.g. tableEnv.registerFunction("MYMAX", new MyMax()); instead of tableEnv.registerFunction("MYINTEGERMAX", new MyIntegerMax()); tableEnv.registerFunction("MYLONGMAX", new MyLongMax()); tableEnv.registerFunction("MYDOULBEMAX", new MyDoubleMax()); Is there a way to implement that? I know the build in function "MAX" can apply to all types, so I wonder if I can also implement that. Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Currently, there is no more flexible approch for aggregate functions.
Scalar functions can be overloaded but aggregate functions do not support this so far. Regards, Timo Am 07.01.19 um 02:27 schrieb yinhua.dai: > Hi Timo, > > But getResultType should only return a concrete type information, right? > How could I implement with a generic type? > > I'd like to clarify my questions again. > Say I want to implement my own "MAX" function, but I want to apply it to > different types, e.g. integer, long, double etc, so I tried to write a class > which extends AggregateFunction *with generic type* to implement the max > function. > > Then I want to register only one function name for all types. > E.g. > tableEnv.registerFunction("MYMAX", new MyMax()); > instead of > tableEnv.registerFunction("MYINTEGERMAX", new MyIntegerMax()); > tableEnv.registerFunction("MYLONGMAX", new MyLongMax()); > tableEnv.registerFunction("MYDOULBEMAX", new MyDoubleMax()); > > Is there a way to implement that? > I know the build in function "MAX" can apply to all types, so I wonder if I > can also implement that. > Thanks. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Timo,
Can you let me know how the build-in "MAX" function able to support different types? Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Currently, this functionality is hard-coded in the aggregation
translation. Namely in `org.apache.flink.table.runtime.aggregate.AggregateUtil#transformToAggregateFunctions` [1]. Timo [1] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala Am 08.01.19 um 06:41 schrieb yinhua.dai: > Hi Timo, > > Can you let me know how the build-in "MAX" function able to support > different types? > Thanks. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |