Generic return type on a user-defined scalar function

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Generic return type on a user-defined scalar function

Morrisa Brenner

Hi Flink folks,


In a Flink job using the SQL API that I’m working on, I have a custom POJO data type with a generic field, and I would like to be able to call a user-defined function on this field. I included a similar function below with the business logic stubbed out, but this example has the return type I'm looking for.


I have no issues using custom functions of this type when they're used in a select statement and the `getResultType` method is excluded from the user-defined function class, but I am unable to get the type information to resolve correctly in contexts like order by and group by statements. It still doesn't work even if the `getResultType` method defines the specific type for a given object explicitly because the job compiler within Flink seems to be assuming the return type from the `eval` method is just an Object (type erasure...), and it fails to generate the object code because it's detecting invalid casts to the desired output type. Without the `getResultType` method, it just fails to detect type entirely. This seems to be fine when it's just a select, but if I try to make it do any operation (like group by) I get the following error: "org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key."


Does anyone know if there's a way to get Flink to pay attention to the type information from `getResultType` when compiling the `eval` method so that the types work out? Or another way to work around the type erasure on the eval method without defining explicit user-defined function classes for each type?


Thanks for your help!


Morrisa




Code snippet:



package flink_generics_testing;


import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.table.functions.ScalarFunction;


/**

* Reads custom values from a table and performs a function on those values.

* T should be able to be a String, long, float, boolean, or Date

*

* @param <T> The expected type of the table column values.

*/

public class CustomScalarFunction<T> extends ScalarFunction {


  private static final long serialVersionUID = -5537657771138360838L;


  private final Class<T> desiredType;


  /**

   * Construct an instance.

   *

   * @param desiredType The type of the value that we're performing the function on.

   */

  public CustomScalarFunction(Class<T> desiredType) {

      this.desiredType = desiredType;

  }


  public T eval(T value) {

      return value;

  }


  @Override

  public TypeInformation<?> getResultType(Class<?>[] signature) {

      return TypeInformation.of(desiredType);

  }


  @Override

  public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {

      return new TypeInformation<?>[]{

              TypeInformation.of(desiredType)

      };

  }

}



--
Morrisa Brenner
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo
Reply | Threaded
Open this post in threaded view
|

Re: Generic return type on a user-defined scalar function

Timo Walther
Hi Morrisa,

usually, this means that you class is not recognized as a POJO. Please check again the requirements of a POJO: Default constructor, getters and setters for every field etc. You can use org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your class is a POJO or not.

I hope this helps.

Regards,
Timo


Am 16.05.19 um 23:18 schrieb Morrisa Brenner:

Hi Flink folks,


In a Flink job using the SQL API that I’m working on, I have a custom POJO data type with a generic field, and I would like to be able to call a user-defined function on this field. I included a similar function below with the business logic stubbed out, but this example has the return type I'm looking for.


I have no issues using custom functions of this type when they're used in a select statement and the `getResultType` method is excluded from the user-defined function class, but I am unable to get the type information to resolve correctly in contexts like order by and group by statements. It still doesn't work even if the `getResultType` method defines the specific type for a given object explicitly because the job compiler within Flink seems to be assuming the return type from the `eval` method is just an Object (type erasure...), and it fails to generate the object code because it's detecting invalid casts to the desired output type. Without the `getResultType` method, it just fails to detect type entirely. This seems to be fine when it's just a select, but if I try to make it do any operation (like group by) I get the following error: "org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key."


Does anyone know if there's a way to get Flink to pay attention to the type information from `getResultType` when compiling the `eval` method so that the types work out? Or another way to work around the type erasure on the eval method without defining explicit user-defined function classes for each type?


Thanks for your help!


Morrisa




Code snippet:



package flink_generics_testing;


import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.table.functions.ScalarFunction;


/**

* Reads custom values from a table and performs a function on those values.

* T should be able to be a String, long, float, boolean, or Date

*

* @param <T> The expected type of the table column values.

*/

public class CustomScalarFunction<T> extends ScalarFunction {


  private static final long serialVersionUID = -5537657771138360838L;


  private final Class<T> desiredType;


  /**

   * Construct an instance.

   *

   * @param desiredType The type of the value that we're performing the function on.

   */

  public CustomScalarFunction(Class<T> desiredType) {

      this.desiredType = desiredType;

  }

  public T eval(T value) {

      return value;

  }


  @Override

  public TypeInformation<?> getResultType(Class<?>[] signature) {

      return TypeInformation.of(desiredType);

  }


  @Override

  public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {

      return new TypeInformation<?>[]{

              TypeInformation.of(desiredType)

      };

  }

}



--
Morrisa Brenner
Software Engineer

225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo
                                  Logo


Reply | Threaded
Open this post in threaded view
|

Re: Generic return type on a user-defined scalar function

JingsongLee
Hi Morrisa:

It seems that flink planner not support return Object(or generic, like you say, type erasure) in ScalarFunction.
In ScalarFunctionCallGen:
val functionCallCode =
s"""
|${parameters.map(_.code).mkString("\n")}
|$resultTypeTerm $resultTerm = $functionReference.eval(
| ${parameters.map(_.resultTerm).mkString(", ")});
|""".stripMargin
There should be a coercive transformation to eval return value to support this situation.
I have no ideas to bypass it. If you can modify the source code, you can change it to this way to support generic return type:
val functionCallCode =
s"""
|${parameters.map(_.code).mkString("\n")}
|$resultTypeTerm $resultTerm = ($resultTypeTerm) $functionReference.eval(
| ${parameters.map(_.resultTerm).mkString(", ")});
|""".stripMargin

Best, JingsongLee

------------------------------------------------------------------
From:Timo Walther <[hidden email]>
Send Time:2019年5月20日(星期一) 23:03
To:user <[hidden email]>
Subject:Re: Generic return type on a user-defined scalar function

Hi Morrisa,

usually, this means that you class is not recognized as a POJO. Please check again the requirements of a POJO: Default constructor, getters and setters for every field etc. You can use org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your class is a POJO or not.

I hope this helps.

Regards,
Timo


Am 16.05.19 um 23:18 schrieb Morrisa Brenner:

Hi Flink folks,


In a Flink job using the SQL API that I’m working on, I have a custom POJO data type with a generic field, and I would like to be able to call a user-defined function on this field. I included a similar function below with the business logic stubbed out, but this example has the return type I'm looking for.


I have no issues using custom functions of this type when they're used in a select statement and the `getResultType` method is excluded from the user-defined function class, but I am unable to get the type information to resolve correctly in contexts like order by and group by statements. It still doesn't work even if the `getResultType` method defines the specific type for a given object explicitly because the job compiler within Flink seems to be assuming the return type from the `eval` method is just an Object (type erasure...), and it fails to generate the object code because it's detecting invalid casts to the desired output type. Without the `getResultType` method, it just fails to detect type entirely. This seems to be fine when it's just a select, but if I try to make it do any operation (like group by) I get the following error: "org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key."


Does anyone know if there's a way to get Flink to pay attention to the type information from `getResultType` when compiling the `eval` method so that the types work out? Or another way to work around the type erasure on the eval method without defining explicit user-defined function classes for each type?


Thanks for your help!


Morrisa




Code snippet:



package flink_generics_testing;


import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.table.functions.ScalarFunction;


/**

* Reads custom values from a table and performs a function on those values.

* T should be able to be a String, long, float, boolean, or Date

*

* @param <T> The expected type of the table column values.

*/

public class CustomScalarFunction<T> extends ScalarFunction {


  private static final long serialVersionUID = -5537657771138360838L;


  private final Class<T> desiredType;


  /**

   * Construct an instance.

   *

   * @param desiredType The type of the value that we're performing the function on.

   */

  public CustomScalarFunction(Class<T> desiredType) {

      this.desiredType = desiredType;

  }

  public T eval(T value) {

      return value;

  }


  @Override

  public TypeInformation<?> getResultType(Class<?>[] signature) {

      return TypeInformation.of(desiredType);

  }


  @Override

  public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {

      return new TypeInformation<?>[]{

              TypeInformation.of(desiredType)

      };

  }

}



--
Morrisa Brenner
Software Engineer

225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo
                                  Logo


Reply | Threaded
Open this post in threaded view
|

Re: Generic return type on a user-defined scalar function

Morrisa Brenner
Hi JingsongLee and Timo,

Thanks for taking a look and for the feedback!

All the best,
Morrisa


Morrisa Brenner
Software Engineer
225 Franklin St, Boston, MA 02110



On May 21, 2019, at 12:10 AM, JingsongLee <[hidden email]> wrote:

Hi Morrisa:

It seems that flink planner not support return Object(or generic, like you say, type erasure) in ScalarFunction.
In ScalarFunctionCallGen:
val functionCallCode =
s"""
|${parameters.map(_.code).mkString("\n")}
|$resultTypeTerm $resultTerm = $functionReference.eval(
| ${parameters.map(_.resultTerm).mkString(", ")});
|""".stripMargin
There should be a coercive transformation to eval return value to support this situation.
I have no ideas to bypass it. If you can modify the source code, you can change it to this way to support generic return type:
val functionCallCode =
s"""
|${parameters.map(_.code).mkString("\n")}
|$resultTypeTerm $resultTerm = ($resultTypeTerm) $functionReference.eval(
| ${parameters.map(_.resultTerm).mkString(", ")});
|""".stripMargin

Best, JingsongLee

------------------------------------------------------------------
From:Timo Walther <[hidden email]>
Send Time:2019年5月20日(星期一) 23:03
To:user <[hidden email]>
Subject:Re: Generic return type on a user-defined scalar function

Hi Morrisa,

usually, this means that you class is not recognized as a POJO. Please check again the requirements of a POJO: Default constructor, getters and setters for every field etc. You can use org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your class is a POJO or not.

I hope this helps.

Regards,
Timo


Am 16.05.19 um 23:18 schrieb Morrisa Brenner:
Hi Flink folks,

In a Flink job using the SQL API that I’m working on, I have a custom POJO data type with a generic field, and I would like to be able to call a user-defined function on this field. I included a similar function below with the business logic stubbed out, but this example has the return type I'm looking for.

I have no issues using custom functions of this type when they're used in a select statement and the `getResultType` method is excluded from the user-defined function class, but I am unable to get the type information to resolve correctly in contexts like order by and group by statements. It still doesn't work even if the `getResultType` method defines the specific type for a given object explicitly because the job compiler within Flink seems to be assuming the return type from the `eval` method is just an Object (type erasure...), and it fails to generate the object code because it's detecting invalid casts to the desired output type. Without the `getResultType` method, it just fails to detect type entirely. This seems to be fine when it's just a select, but if I try to make it do any operation (like group by) I get the following error: "org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key."

Does anyone know if there's a way to get Flink to pay attention to the type information from `getResultType` when compiling the `eval` method so that the types work out? Or another way to work around the type erasure on the eval method without defining explicit user-defined function classes for each type?

Thanks for your help!

Morrisa



Code snippet:


package flink_generics_testing;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.functions.ScalarFunction;

/**
* Reads custom values from a table and performs a function on those values.
* T should be able to be a String, long, float, boolean, or Date
*
* @param <T> The expected type of the table column values.
*/
public class CustomScalarFunction<T> extends ScalarFunction {

  private static final long serialVersionUID = -5537657771138360838L;

  private final Class<T> desiredType;

  /**
   * Construct an instance.
   *
   * @param desiredType The type of the value that we're performing the function on.
   */
  public CustomScalarFunction(Class<T> desiredType) {
      this.desiredType = desiredType;
  }

  public T eval(T value) {
      return value;
  }

  @Override
  public TypeInformation<?> getResultType(Class<?>[] signature) {
      return TypeInformation.of(desiredType);
  }

  @Override
  public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
      return new TypeInformation<?>[]{
              TypeInformation.of(desiredType)
      };
  }
}


--
Morrisa Brenner
Software Engineer

225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo
                                  Logo