Re: The way to write a UDF with generic type

Posted by Timo Walther on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/The-way-to-write-a-UDF-with-generic-type-tp25329p25337.html

Hi Yinhua,

Flink needs to know how to serialize and deserialize a type `T`. If you
are using a type variable here, Flink can not derive the type information.

You need to override
org.apache.flink.table.functions.AggregateFunction#getResultType and
return type information that matches.

Regards,
Timo



Am 04.01.19 um 10:28 schrieb yinhua.dai:

> Hi Chesnay,
>
> Maybe you misunderstand my question.
> I have below code:
> public class MyMaxAggregation<T> extends AggregateFunction<T,
> MyMaxAggregation.MyAccumulator> {
>    @Override
>    public MyAccumulator createAccumulator() {
>      return new MyAccumulator();
>    }
>
>    @Override
>    public T getValue(MyAccumulator accumulator) {
>      return null;
>    }
>
>    static class MyAccumulator {
>      double maxValue;
>    }
>
> }
>
> But tableEnv.registerFunction("MYMAX", new MyMaxAggregation<Integer>());
> will throw exception as below:
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Type of
> TypeVariable 'T' in 'class com.tr.apt.test.MyMaxAggregation' could not be
> determined. This is most likely a type erasure problem. The type extraction
> currently supports types with generic variables only in cases where all
> variables in the return type can be deduced from the input type(s).
> Otherwise the type has to be specified explicitly using type information.
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:762)
> at
> org.apache.flink.table.api.java.StreamTableEnvironment.registerFunction(StreamTableEnvironment.scala:482)
> at com.tr.apt.test.StreamingJob.main(StreamingJob.java:52)
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/