The way to write a UDF with generic type

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

The way to write a UDF with generic type

yinhua.dai
Hi Community,

I tried to write a UDF with generic type, but seems that Flink will complain
not recognizing the type information when I use it in SQL.

I checked the implementation of native function "MAX" and realize that it's
not using the same API(AggregationFunction e.g.) as user defined function,
is that the reason why "MAX" doesn't have the generic type issue?

How can I implement my own "MAX" function which could support all types?
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: The way to write a UDF with generic type

Chesnay Schepler
I believe you have to extend
"org.apache.flink.table.functions.AggregateFunction" instead for it to
work with SQL (or more generally speaking, any sub-class of
"org.apache.flink.table.functions.UserDefinedFunction".

On 04.01.2019 05:18, yinhua.dai wrote:

> Hi Community,
>
> I tried to write a UDF with generic type, but seems that Flink will complain
> not recognizing the type information when I use it in SQL.
>
> I checked the implementation of native function "MAX" and realize that it's
> not using the same API(AggregationFunction e.g.) as user defined function,
> is that the reason why "MAX" doesn't have the generic type issue?
>
> How can I implement my own "MAX" function which could support all types?
> Thanks.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: The way to write a UDF with generic type

yinhua.dai
Hi Chesnay,

Maybe you misunderstand my question.
I have below code:
public class MyMaxAggregation<T> extends AggregateFunction<T,
MyMaxAggregation.MyAccumulator> {
  @Override
  public MyAccumulator createAccumulator() {
    return new MyAccumulator();
  }

  @Override
  public T getValue(MyAccumulator accumulator) {
    return null;
  }

  static class MyAccumulator {
    double maxValue;
  }

}

But tableEnv.registerFunction("MYMAX", new MyMaxAggregation<Integer>());
will throw exception as below:
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Type of
TypeVariable 'T' in 'class com.tr.apt.test.MyMaxAggregation' could not be
determined. This is most likely a type erasure problem. The type extraction
currently supports types with generic variables only in cases where all
variables in the return type can be deduced from the input type(s).
Otherwise the type has to be specified explicitly using type information.
        at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
        at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
        at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
        at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:762)
        at
org.apache.flink.table.api.java.StreamTableEnvironment.registerFunction(StreamTableEnvironment.scala:482)
        at com.tr.apt.test.StreamingJob.main(StreamingJob.java:52)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: The way to write a UDF with generic type

Timo Walther
Hi Yinhua,

Flink needs to know how to serialize and deserialize a type `T`. If you
are using a type variable here, Flink can not derive the type information.

You need to override
org.apache.flink.table.functions.AggregateFunction#getResultType and
return type information that matches.

Regards,
Timo



Am 04.01.19 um 10:28 schrieb yinhua.dai:

> Hi Chesnay,
>
> Maybe you misunderstand my question.
> I have below code:
> public class MyMaxAggregation<T> extends AggregateFunction<T,
> MyMaxAggregation.MyAccumulator> {
>    @Override
>    public MyAccumulator createAccumulator() {
>      return new MyAccumulator();
>    }
>
>    @Override
>    public T getValue(MyAccumulator accumulator) {
>      return null;
>    }
>
>    static class MyAccumulator {
>      double maxValue;
>    }
>
> }
>
> But tableEnv.registerFunction("MYMAX", new MyMaxAggregation<Integer>());
> will throw exception as below:
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Type of
> TypeVariable 'T' in 'class com.tr.apt.test.MyMaxAggregation' could not be
> determined. This is most likely a type erasure problem. The type extraction
> currently supports types with generic variables only in cases where all
> variables in the return type can be deduced from the input type(s).
> Otherwise the type has to be specified explicitly using type information.
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:762)
> at
> org.apache.flink.table.api.java.StreamTableEnvironment.registerFunction(StreamTableEnvironment.scala:482)
> at com.tr.apt.test.StreamingJob.main(StreamingJob.java:52)
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: The way to write a UDF with generic type

yinhua.dai
Hi Timo,

But getResultType should only return a concrete type information, right?
How could I implement with a generic type?

I'd like to clarify my questions again.
Say I want to implement my own "MAX" function, but I want to apply it to
different types, e.g. integer, long, double etc, so I tried to write a class
which extends AggregateFunction *with generic type* to implement the max
function.

Then I want to register only one function name for all types.
E.g.
tableEnv.registerFunction("MYMAX", new MyMax());
instead of
tableEnv.registerFunction("MYINTEGERMAX", new MyIntegerMax());
tableEnv.registerFunction("MYLONGMAX", new MyLongMax());
tableEnv.registerFunction("MYDOULBEMAX", new MyDoubleMax());

Is there a way to implement that?
I know the build in function "MAX" can apply to all types, so I wonder if I
can also implement that.
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: The way to write a UDF with generic type

Timo Walther
Currently, there is no more flexible approch for aggregate functions.
Scalar functions can be overloaded but aggregate functions do not
support this so far.

Regards,
Timo


Am 07.01.19 um 02:27 schrieb yinhua.dai:

> Hi Timo,
>
> But getResultType should only return a concrete type information, right?
> How could I implement with a generic type?
>
> I'd like to clarify my questions again.
> Say I want to implement my own "MAX" function, but I want to apply it to
> different types, e.g. integer, long, double etc, so I tried to write a class
> which extends AggregateFunction *with generic type* to implement the max
> function.
>
> Then I want to register only one function name for all types.
> E.g.
> tableEnv.registerFunction("MYMAX", new MyMax());
> instead of
> tableEnv.registerFunction("MYINTEGERMAX", new MyIntegerMax());
> tableEnv.registerFunction("MYLONGMAX", new MyLongMax());
> tableEnv.registerFunction("MYDOULBEMAX", new MyDoubleMax());
>
> Is there a way to implement that?
> I know the build in function "MAX" can apply to all types, so I wonder if I
> can also implement that.
> Thanks.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: The way to write a UDF with generic type

yinhua.dai
Hi Timo,

Can you let me know how the build-in "MAX" function able to support
different types?
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: The way to write a UDF with generic type

Timo Walther
Currently, this functionality is hard-coded in the aggregation
translation. Namely in
`org.apache.flink.table.runtime.aggregate.AggregateUtil#transformToAggregateFunctions`
[1].

Timo

[1]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala

Am 08.01.19 um 06:41 schrieb yinhua.dai:

> Hi Timo,
>
> Can you let me know how the build-in "MAX" function able to support
> different types?
> Thanks.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: The way to write a UDF with generic type

yinhua.dai