Hi Flink folks, In a Flink job using the SQL API that I’m working on, I have a custom POJO data type with a generic field, and I would like to be able to call a user-defined function on this field. I included a similar function below with the business logic stubbed out, but this example has the return type I'm looking for. I have no issues using custom functions of this type when they're used in a select statement and the `getResultType` method is excluded from the user-defined function class, but I am unable to get the type information to resolve correctly in contexts like order by and group by statements. It still doesn't work even if the `getResultType` method defines the specific type for a given object explicitly because the job compiler within Flink seems to be assuming the return type from the `eval` method is just an Object (type erasure...), and it fails to generate the object code because it's detecting invalid casts to the desired output type. Without the `getResultType` method, it just fails to detect type entirely. This seems to be fine when it's just a select, but if I try to make it do any operation (like group by) I get the following error: "org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key." Does anyone know if there's a way to get Flink to pay attention to the type information from `getResultType` when compiling the `eval` method so that the types work out? Or another way to work around the type erasure on the eval method without defining explicit user-defined function classes for each type? Thanks for your help! Morrisa Code snippet: package flink_generics_testing; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.functions.ScalarFunction; /** * Reads custom values from a table and performs a function on those values. * T should be able to be a String, long, float, boolean, or Date * * @param <T> The expected type of the table column values. */ public class CustomScalarFunction<T> extends ScalarFunction { private static final long serialVersionUID = -5537657771138360838L; private final Class<T> desiredType; /** * Construct an instance. * * @param desiredType The type of the value that we're performing the function on. */ public CustomScalarFunction(Class<T> desiredType) { this.desiredType = desiredType; } public T eval(T value) { return value; } @Override public TypeInformation<?> getResultType(Class<?>[] signature) { return TypeInformation.of(desiredType); } @Override public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) { return new TypeInformation<?>[]{ TypeInformation.of(desiredType) }; } }
|
Hi Morrisa,
usually, this means that you class is
not recognized as a POJO. Please check again the requirements of a
POJO: Default constructor, getters and setters for every field
etc. You can use
org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if
your class is a POJO or not.
I hope this helps.
Regards,
Timo
Am 16.05.19 um 23:18 schrieb Morrisa
Brenner:
|
Hi Morrisa: It seems that flink planner not support return Object(or generic, like you say, type erasure) in ScalarFunction. In ScalarFunctionCallGen: val functionCallCode = There should be a coercive transformation to eval return value to support this situation. I have no ideas to bypass it. If you can modify the source code, you can change it to this way to support generic return type: val functionCallCode = Best, JingsongLee
|
Hi JingsongLee and Timo,
Thanks for taking a look and for the feedback! All the best, Morrisa
|
Free forum by Nabble | Edit this page |