Trying to create a generic aggregate UDF

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Trying to create a generic aggregate UDF

Dylan Forciea

I am attempting to create an aggregate UDF that takes a generic parameter T, but for the life of me, I can’t seem to get it to work.

 

The UDF I’m trying to implement takes two input arguments, a value that is generic, and a date. It will choose the non-null value with the latest associated date. I had originally done this with separate Top 1 queries connected with a left join, but the memory usage seems far higher than doing this with a custom aggregate function.

 

As a first attempt, I tried to use custom type inference to have it validate that the first argument type is the output type and have a single function, and also used DataTypes.STRUCTURE to try to define the shape of my accumulator. However, that resulted in an exception like this whenever I tried to use a non-string value as the first argument:

 

[error] Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String

[error]   at io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown Source)

[error]   at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)

[error]   at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)

[error]   at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)

[error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)

[error]   at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)

[error]   at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)

[error]   at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)

[error]   at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)

[error]   at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)

[error]   at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)

[error]   at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)

[error]   at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)

[error]   at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)

[error]   at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)

[error]   at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)

[error]   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)

[error]   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)

[error]   at java.lang.Thread.run(Thread.java:748)

 

Figuring that I can’t do something of that sort, I tried to follow the general approach in the Sum accumulator[1] in the Flink source code where separate classes are derived from a base class, and each advertises its accumulator shape, but ended up with the exact same stack trace as above when I tried to create and use a function specifically for a non-string type like Long.

 

Is there something I’m missing as far as how this is supposed to be done? Everything I try either results in a stack track like the above, or type erasure issues when trying to get type information for the accumulator. If I just copy the generic code multiple times and just directly use Long or String rather than using subclassing, then it works just fine. I appreciate any help I can get on this!

 

Regards,

Dylan Forciea

 

[1] https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Trying to create a generic aggregate UDF

Timo Walther
Hi Dylan,

I'm assuming your are using Flink 1.12 and the Blink planner?

Beginning from 1.12 you can use the "new" aggregate functions with a
better type inference. So TypeInformation will not be used in this stack.

I tried to come up with an example that should explain the rough design.
I will include this example into the Flink code base. I hope this helps:



import org.apache.flink.table.types.inference.InputTypeStrategies;

public static class LastIfNotNull<T>
         extends AggregateFunction<Row, LastIfNotNull.Accumulator<T>> {

     public static class Accumulator<T> {
         public T value;
         public LocalDate date;
     }

     public void accumulate(Accumulator<T> acc, T input, LocalDate date) {
         if (input != null) {
             acc.value = input;
             acc.date = date;
         }
     }

     @Override
     public Row getValue(Accumulator<T> acc) {
         return Row.of(acc.value, acc.date);
     }

     @Override
     public Accumulator<T> createAccumulator() {
         return new Accumulator<>();
     }

     @Override
     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
         return TypeInference.newBuilder()
                 .inputTypeStrategy(
                         InputTypeStrategies.sequence(
                                 InputTypeStrategies.ANY,
 
InputTypeStrategies.explicit(DataTypes.DATE())))
                 .accumulatorTypeStrategy(
                         callContext -> {
                             DataType accDataType =
                                     DataTypes.STRUCTURED(
                                             Accumulator.class,
                                             DataTypes.FIELD(
                                                     "value",
 
callContext.getArgumentDataTypes().get(0)),
                                             DataTypes.FIELD("date",
DataTypes.DATE()));
                             return Optional.of(accDataType);
                         })
                 .outputTypeStrategy(
                         callContext -> {
                             DataType argDataType =
callContext.getArgumentDataTypes().get(0);
                             DataType outputDataType =
                                     DataTypes.ROW(
                                             DataTypes.FIELD("value",
argDataType),
                                             DataTypes.FIELD("date",
DataTypes.DATE()));
                             return Optional.of(outputDataType);
                         })
                 .build();
     }
}

Regards,
Timo



On 20.01.21 01:04, Dylan Forciea wrote:

> I am attempting to create an aggregate UDF that takes a generic
> parameter T, but for the life of me, I can’t seem to get it to work.
>
> The UDF I’m trying to implement takes two input arguments, a value that
> is generic, and a date. It will choose the non-null value with the
> latest associated date. I had originally done this with separate Top 1
> queries connected with a left join, but the memory usage seems far
> higher than doing this with a custom aggregate function.
>
> As a first attempt, I tried to use custom type inference to have it
> validate that the first argument type is the output type and have a
> single function, and also used DataTypes.STRUCTURE to try to define the
> shape of my accumulator. However, that resulted in an exception like
> this whenever I tried to use a non-string value as the first argument:
>
> [error] Caused by: java.lang.ClassCastException: java.lang.Long cannot
> be cast to java.lang.String
>
> [error]   at
> io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
> Source)
>
> [error]   at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
>
> [error]   at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
>
> [error]   at
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
>
> [error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)
>
> [error]   at
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
>
> [error]   at
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
>
> [error]   at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>
> [error]   at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
>
> [error]   at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
>
> [error]   at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
>
> [error]   at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>
> [error]   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>
> [error]   at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>
> [error]   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>
> [error]   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>
> [error]   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>
> [error]   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>
> [error]   at java.lang.Thread.run(Thread.java:748)
>
> Figuring that I can’t do something of that sort, I tried to follow the
> general approach in the Sum accumulator[1] in the Flink source code
> where separate classes are derived from a base class, and each
> advertises its accumulator shape, but ended up with the exact same stack
> trace as above when I tried to create and use a function specifically
> for a non-string type like Long.
>
> Is there something I’m missing as far as how this is supposed to be
> done? Everything I try either results in a stack track like the above,
> or type erasure issues when trying to get type information for the
> accumulator. If I just copy the generic code multiple times and just
> directly use Long or String rather than using subclassing, then it works
> just fine. I appreciate any help I can get on this!
>
> Regards,
>
> Dylan Forciea
>
> [1]
> https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
>

Reply | Threaded
Open this post in threaded view
|

Re: Trying to create a generic aggregate UDF

Dylan Forciea
Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What you proposed is roughly what I had come up with the first time around that resulted in the stack trace with the ClassCastException I had originally included. I saw that you had used a Row instead of just the value in our example, but changing it that way didn't seem to help, which makes sense since the problem seems to be in the code generated for the accumulator Converter and not the output.

Here is the exact code that caused that error (while calling LatestNonNullLong):

The registration of the below:
    env.createTemporarySystemFunction("LatestNonNullLong", classOf[LatestNonNull[Long]])
    env.createTemporarySystemFunction("LatestNonNullString", classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, TypeInference}

case class LatestNonNullAccumulator[T](
    var value: T = null.asInstanceOf[T],
    var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
    LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
    acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: LocalDate): Unit = {
    if (value != null) {
      Option(acc.date).fold {
        acc.value = value
        acc.date = date
      } { accDate =>
        if (date != null && date.isAfter(accDate)) {
          acc.value = value
          acc.date = date
        }
      }
    }
  }

  def merge(
      acc: LatestNonNullAccumulator[T],
      it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
    val iter = it.iterator()
    while (iter.hasNext) {
      val a = iter.next()
      if (a.value != null) {
        Option(acc.date).fold {
          acc.value = a.value
          acc.date = a.date
        } { accDate =>
          Option(a.date).map { curDate =>
            if (curDate.isAfter(accDate)) {
              acc.value = a.value
              acc.date = a.date
            }
          }
        }
      }
    }
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
    acc.value = null.asInstanceOf[T]
    acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = {
    TypeInference
      .newBuilder()
      .inputTypeStrategy(InputTypeStrategies
        .sequence(InputTypeStrategies.ANY, InputTypeStrategies.explicit(DataTypes.DATE())))
      .accumulatorTypeStrategy { callContext =>
        val accDataType = DataTypes.STRUCTURED(
          classOf[LatestNonNullAccumulator[T]],
          DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)),
          DataTypes.FIELD("date", DataTypes.DATE()))

        Optional.of(accDataType)
      }
      .outputTypeStrategy { callContext =>
        val outputDataType = callContext.getArgumentDataTypes().get(0);
        Optional.of(outputDataType);
      }
      .build()
  }
}

Regards,
Dylan Forciea

On 1/20/21, 2:37 AM, "Timo Walther" <[hidden email]> wrote:

    Hi Dylan,

    I'm assuming your are using Flink 1.12 and the Blink planner?

    Beginning from 1.12 you can use the "new" aggregate functions with a
    better type inference. So TypeInformation will not be used in this stack.

    I tried to come up with an example that should explain the rough design.
    I will include this example into the Flink code base. I hope this helps:



    import org.apache.flink.table.types.inference.InputTypeStrategies;

    public static class LastIfNotNull<T>
             extends AggregateFunction<Row, LastIfNotNull.Accumulator<T>> {

         public static class Accumulator<T> {
             public T value;
             public LocalDate date;
         }

         public void accumulate(Accumulator<T> acc, T input, LocalDate date) {
             if (input != null) {
                 acc.value = input;
                 acc.date = date;
             }
         }

         @Override
         public Row getValue(Accumulator<T> acc) {
             return Row.of(acc.value, acc.date);
         }

         @Override
         public Accumulator<T> createAccumulator() {
             return new Accumulator<>();
         }

         @Override
         public TypeInference getTypeInference(DataTypeFactory typeFactory) {
             return TypeInference.newBuilder()
                     .inputTypeStrategy(
                             InputTypeStrategies.sequence(
                                     InputTypeStrategies.ANY,

    InputTypeStrategies.explicit(DataTypes.DATE())))
                     .accumulatorTypeStrategy(
                             callContext -> {
                                 DataType accDataType =
                                         DataTypes.STRUCTURED(
                                                 Accumulator.class,
                                                 DataTypes.FIELD(
                                                         "value",

    callContext.getArgumentDataTypes().get(0)),
                                                 DataTypes.FIELD("date",
    DataTypes.DATE()));
                                 return Optional.of(accDataType);
                             })
                     .outputTypeStrategy(
                             callContext -> {
                                 DataType argDataType =
    callContext.getArgumentDataTypes().get(0);
                                 DataType outputDataType =
                                         DataTypes.ROW(
                                                 DataTypes.FIELD("value",
    argDataType),
                                                 DataTypes.FIELD("date",
    DataTypes.DATE()));
                                 return Optional.of(outputDataType);
                             })
                     .build();
         }
    }

    Regards,
    Timo



    On 20.01.21 01:04, Dylan Forciea wrote:
    > I am attempting to create an aggregate UDF that takes a generic
    > parameter T, but for the life of me, I can’t seem to get it to work.
    >
    > The UDF I’m trying to implement takes two input arguments, a value that
    > is generic, and a date. It will choose the non-null value with the
    > latest associated date. I had originally done this with separate Top 1
    > queries connected with a left join, but the memory usage seems far
    > higher than doing this with a custom aggregate function.
    >
    > As a first attempt, I tried to use custom type inference to have it
    > validate that the first argument type is the output type and have a
    > single function, and also used DataTypes.STRUCTURE to try to define the
    > shape of my accumulator. However, that resulted in an exception like
    > this whenever I tried to use a non-string value as the first argument:
    >
    > [error] Caused by: java.lang.ClassCastException: java.lang.Long cannot
    > be cast to java.lang.String
    >
    > [error]   at
    > io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
    > Source)
    >
    > [error]   at
    > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
    >
    > [error]   at
    > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
    >
    > [error]   at
    > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
    >
    > [error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)
    >
    > [error]   at
    > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
    >
    > [error]   at
    > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
    >
    > [error]   at
    > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
    >
    > [error]   at
    > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
    >
    > [error]   at
    > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
    >
    > [error]   at
    > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
    >
    > [error]   at
    > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    >
    > [error]   at
    > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    >
    > [error]   at
    > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    >
    > [error]   at
    > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    >
    > [error]   at
    > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    >
    > [error]   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    >
    > [error]   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    >
    > [error]   at java.lang.Thread.run(Thread.java:748)
    >
    > Figuring that I can’t do something of that sort, I tried to follow the
    > general approach in the Sum accumulator[1] in the Flink source code
    > where separate classes are derived from a base class, and each
    > advertises its accumulator shape, but ended up with the exact same stack
    > trace as above when I tried to create and use a function specifically
    > for a non-string type like Long.
    >
    > Is there something I’m missing as far as how this is supposed to be
    > done? Everything I try either results in a stack track like the above,
    > or type erasure issues when trying to get type information for the
    > accumulator. If I just copy the generic code multiple times and just
    > directly use Long or String rather than using subclassing, then it works
    > just fine. I appreciate any help I can get on this!
    >
    > Regards,
    >
    > Dylan Forciea
    >
    > [1]
    > https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
    >


Reply | Threaded
Open this post in threaded view
|

Re: Trying to create a generic aggregate UDF

Dylan Forciea
As a side note, I also just tried to unify into a single function registration and used _ as the type parameter in the classOf calls there and within the TypeInference definition for the accumulator and still ended up with the exact same stack trace.

Dylan

On 1/20/21, 9:22 AM, "Dylan Forciea" <[hidden email]> wrote:

    Timo,

    I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What you proposed is roughly what I had come up with the first time around that resulted in the stack trace with the ClassCastException I had originally included. I saw that you had used a Row instead of just the value in our example, but changing it that way didn't seem to help, which makes sense since the problem seems to be in the code generated for the accumulator Converter and not the output.

    Here is the exact code that caused that error (while calling LatestNonNullLong):

    The registration of the below:
        env.createTemporarySystemFunction("LatestNonNullLong", classOf[LatestNonNull[Long]])
        env.createTemporarySystemFunction("LatestNonNullString", classOf[LatestNonNull[String]])


    The class itself:

    import java.time.LocalDate
    import java.util.Optional
    import org.apache.flink.table.api.DataTypes
    import org.apache.flink.table.catalog.DataTypeFactory
    import org.apache.flink.table.functions.AggregateFunction
    import org.apache.flink.table.types.inference.{InputTypeStrategies, TypeInference}

    case class LatestNonNullAccumulator[T](
        var value: T = null.asInstanceOf[T],
        var date: LocalDate = null)

    class LatestNonNull[T] extends AggregateFunction[T, LatestNonNullAccumulator[T]] {

      override def createAccumulator(): LatestNonNullAccumulator[T] = {
        LatestNonNullAccumulator[T]()
      }

      override def getValue(acc: LatestNonNullAccumulator[T]): T = {
        acc.value
      }

      def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: LocalDate): Unit = {
        if (value != null) {
          Option(acc.date).fold {
            acc.value = value
            acc.date = date
          } { accDate =>
            if (date != null && date.isAfter(accDate)) {
              acc.value = value
              acc.date = date
            }
          }
        }
      }

      def merge(
          acc: LatestNonNullAccumulator[T],
          it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
        val iter = it.iterator()
        while (iter.hasNext) {
          val a = iter.next()
          if (a.value != null) {
            Option(acc.date).fold {
              acc.value = a.value
              acc.date = a.date
            } { accDate =>
              Option(a.date).map { curDate =>
                if (curDate.isAfter(accDate)) {
                  acc.value = a.value
                  acc.date = a.date
                }
              }
            }
          }
        }
      }

      def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
        acc.value = null.asInstanceOf[T]
        acc.date = null
      }

      override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = {
        TypeInference
          .newBuilder()
          .inputTypeStrategy(InputTypeStrategies
            .sequence(InputTypeStrategies.ANY, InputTypeStrategies.explicit(DataTypes.DATE())))
          .accumulatorTypeStrategy { callContext =>
            val accDataType = DataTypes.STRUCTURED(
              classOf[LatestNonNullAccumulator[T]],
              DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)),
              DataTypes.FIELD("date", DataTypes.DATE()))

            Optional.of(accDataType)
          }
          .outputTypeStrategy { callContext =>
            val outputDataType = callContext.getArgumentDataTypes().get(0);
            Optional.of(outputDataType);
          }
          .build()
      }
    }

    Regards,
    Dylan Forciea

    On 1/20/21, 2:37 AM, "Timo Walther" <[hidden email]> wrote:

        Hi Dylan,

        I'm assuming your are using Flink 1.12 and the Blink planner?

        Beginning from 1.12 you can use the "new" aggregate functions with a
        better type inference. So TypeInformation will not be used in this stack.

        I tried to come up with an example that should explain the rough design.
        I will include this example into the Flink code base. I hope this helps:



        import org.apache.flink.table.types.inference.InputTypeStrategies;

        public static class LastIfNotNull<T>
                 extends AggregateFunction<Row, LastIfNotNull.Accumulator<T>> {

             public static class Accumulator<T> {
                 public T value;
                 public LocalDate date;
             }

             public void accumulate(Accumulator<T> acc, T input, LocalDate date) {
                 if (input != null) {
                     acc.value = input;
                     acc.date = date;
                 }
             }

             @Override
             public Row getValue(Accumulator<T> acc) {
                 return Row.of(acc.value, acc.date);
             }

             @Override
             public Accumulator<T> createAccumulator() {
                 return new Accumulator<>();
             }

             @Override
             public TypeInference getTypeInference(DataTypeFactory typeFactory) {
                 return TypeInference.newBuilder()
                         .inputTypeStrategy(
                                 InputTypeStrategies.sequence(
                                         InputTypeStrategies.ANY,

        InputTypeStrategies.explicit(DataTypes.DATE())))
                         .accumulatorTypeStrategy(
                                 callContext -> {
                                     DataType accDataType =
                                             DataTypes.STRUCTURED(
                                                     Accumulator.class,
                                                     DataTypes.FIELD(
                                                             "value",

        callContext.getArgumentDataTypes().get(0)),
                                                     DataTypes.FIELD("date",
        DataTypes.DATE()));
                                     return Optional.of(accDataType);
                                 })
                         .outputTypeStrategy(
                                 callContext -> {
                                     DataType argDataType =
        callContext.getArgumentDataTypes().get(0);
                                     DataType outputDataType =
                                             DataTypes.ROW(
                                                     DataTypes.FIELD("value",
        argDataType),
                                                     DataTypes.FIELD("date",
        DataTypes.DATE()));
                                     return Optional.of(outputDataType);
                                 })
                         .build();
             }
        }

        Regards,
        Timo



        On 20.01.21 01:04, Dylan Forciea wrote:
        > I am attempting to create an aggregate UDF that takes a generic
        > parameter T, but for the life of me, I can’t seem to get it to work.
        >
        > The UDF I’m trying to implement takes two input arguments, a value that
        > is generic, and a date. It will choose the non-null value with the
        > latest associated date. I had originally done this with separate Top 1
        > queries connected with a left join, but the memory usage seems far
        > higher than doing this with a custom aggregate function.
        >
        > As a first attempt, I tried to use custom type inference to have it
        > validate that the first argument type is the output type and have a
        > single function, and also used DataTypes.STRUCTURE to try to define the
        > shape of my accumulator. However, that resulted in an exception like
        > this whenever I tried to use a non-string value as the first argument:
        >
        > [error] Caused by: java.lang.ClassCastException: java.lang.Long cannot
        > be cast to java.lang.String
        >
        > [error]   at
        > io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
        > Source)
        >
        > [error]   at
        > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
        >
        > [error]   at
        > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
        >
        > [error]   at
        > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
        >
        > [error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)
        >
        > [error]   at
        > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
        >
        > [error]   at
        > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
        >
        > [error]   at
        > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
        >
        > [error]   at
        > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
        >
        > [error]   at
        > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
        >
        > [error]   at
        > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
        >
        > [error]   at
        > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        >
        > [error]   at
        > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
        >
        > [error]   at
        > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
        >
        > [error]   at
        > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
        >
        > [error]   at
        > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
        >
        > [error]   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        >
        > [error]   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        >
        > [error]   at java.lang.Thread.run(Thread.java:748)
        >
        > Figuring that I can’t do something of that sort, I tried to follow the
        > general approach in the Sum accumulator[1] in the Flink source code
        > where separate classes are derived from a base class, and each
        > advertises its accumulator shape, but ended up with the exact same stack
        > trace as above when I tried to create and use a function specifically
        > for a non-string type like Long.
        >
        > Is there something I’m missing as far as how this is supposed to be
        > done? Everything I try either results in a stack track like the above,
        > or type erasure issues when trying to get type information for the
        > accumulator. If I just copy the generic code multiple times and just
        > directly use Long or String rather than using subclassing, then it works
        > just fine. I appreciate any help I can get on this!
        >
        > Regards,
        >
        > Dylan Forciea
        >
        > [1]
        > https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
        >



Reply | Threaded
Open this post in threaded view
|

Re: Trying to create a generic aggregate UDF

Dylan Forciea
Oh, I think I might have a clue as to what is going on. I notice that it will work properly when I only call it on Long. I think that it is using the same generated code for the Converter for whatever was called first.

Since in Scala I can't declare an object as static within the class itself, I wonder if it won't generate appropriate Converter code per subtype. I tried creating a subclass that is specific to the type within my class and returning that as the accumulator, but that didn't help. And, I can't refer to that class in the TypeInference since it isn't static and I get an error from Flink because of that. I'm going to see if I just write this UDF in Java with an embedded public static class like you have if it will solve my problems. I'll report back to let you know what I find. If that works, I'm not quite sure how to make it work in Scala.

Regards,
Dylan Forciea

On 1/20/21, 9:34 AM, "Dylan Forciea" <[hidden email]> wrote:

    As a side note, I also just tried to unify into a single function registration and used _ as the type parameter in the classOf calls there and within the TypeInference definition for the accumulator and still ended up with the exact same stack trace.

    Dylan

    On 1/20/21, 9:22 AM, "Dylan Forciea" <[hidden email]> wrote:

        Timo,

        I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What you proposed is roughly what I had come up with the first time around that resulted in the stack trace with the ClassCastException I had originally included. I saw that you had used a Row instead of just the value in our example, but changing it that way didn't seem to help, which makes sense since the problem seems to be in the code generated for the accumulator Converter and not the output.

        Here is the exact code that caused that error (while calling LatestNonNullLong):

        The registration of the below:
            env.createTemporarySystemFunction("LatestNonNullLong", classOf[LatestNonNull[Long]])
            env.createTemporarySystemFunction("LatestNonNullString", classOf[LatestNonNull[String]])


        The class itself:

        import java.time.LocalDate
        import java.util.Optional
        import org.apache.flink.table.api.DataTypes
        import org.apache.flink.table.catalog.DataTypeFactory
        import org.apache.flink.table.functions.AggregateFunction
        import org.apache.flink.table.types.inference.{InputTypeStrategies, TypeInference}

        case class LatestNonNullAccumulator[T](
            var value: T = null.asInstanceOf[T],
            var date: LocalDate = null)

        class LatestNonNull[T] extends AggregateFunction[T, LatestNonNullAccumulator[T]] {

          override def createAccumulator(): LatestNonNullAccumulator[T] = {
            LatestNonNullAccumulator[T]()
          }

          override def getValue(acc: LatestNonNullAccumulator[T]): T = {
            acc.value
          }

          def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: LocalDate): Unit = {
            if (value != null) {
              Option(acc.date).fold {
                acc.value = value
                acc.date = date
              } { accDate =>
                if (date != null && date.isAfter(accDate)) {
                  acc.value = value
                  acc.date = date
                }
              }
            }
          }

          def merge(
              acc: LatestNonNullAccumulator[T],
              it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
            val iter = it.iterator()
            while (iter.hasNext) {
              val a = iter.next()
              if (a.value != null) {
                Option(acc.date).fold {
                  acc.value = a.value
                  acc.date = a.date
                } { accDate =>
                  Option(a.date).map { curDate =>
                    if (curDate.isAfter(accDate)) {
                      acc.value = a.value
                      acc.date = a.date
                    }
                  }
                }
              }
            }
          }

          def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
            acc.value = null.asInstanceOf[T]
            acc.date = null
          }

          override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = {
            TypeInference
              .newBuilder()
              .inputTypeStrategy(InputTypeStrategies
                .sequence(InputTypeStrategies.ANY, InputTypeStrategies.explicit(DataTypes.DATE())))
              .accumulatorTypeStrategy { callContext =>
                val accDataType = DataTypes.STRUCTURED(
                  classOf[LatestNonNullAccumulator[T]],
                  DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)),
                  DataTypes.FIELD("date", DataTypes.DATE()))

                Optional.of(accDataType)
              }
              .outputTypeStrategy { callContext =>
                val outputDataType = callContext.getArgumentDataTypes().get(0);
                Optional.of(outputDataType);
              }
              .build()
          }
        }

        Regards,
        Dylan Forciea

        On 1/20/21, 2:37 AM, "Timo Walther" <[hidden email]> wrote:

            Hi Dylan,

            I'm assuming your are using Flink 1.12 and the Blink planner?

            Beginning from 1.12 you can use the "new" aggregate functions with a
            better type inference. So TypeInformation will not be used in this stack.

            I tried to come up with an example that should explain the rough design.
            I will include this example into the Flink code base. I hope this helps:



            import org.apache.flink.table.types.inference.InputTypeStrategies;

            public static class LastIfNotNull<T>
                     extends AggregateFunction<Row, LastIfNotNull.Accumulator<T>> {

                 public static class Accumulator<T> {
                     public T value;
                     public LocalDate date;
                 }

                 public void accumulate(Accumulator<T> acc, T input, LocalDate date) {
                     if (input != null) {
                         acc.value = input;
                         acc.date = date;
                     }
                 }

                 @Override
                 public Row getValue(Accumulator<T> acc) {
                     return Row.of(acc.value, acc.date);
                 }

                 @Override
                 public Accumulator<T> createAccumulator() {
                     return new Accumulator<>();
                 }

                 @Override
                 public TypeInference getTypeInference(DataTypeFactory typeFactory) {
                     return TypeInference.newBuilder()
                             .inputTypeStrategy(
                                     InputTypeStrategies.sequence(
                                             InputTypeStrategies.ANY,

            InputTypeStrategies.explicit(DataTypes.DATE())))
                             .accumulatorTypeStrategy(
                                     callContext -> {
                                         DataType accDataType =
                                                 DataTypes.STRUCTURED(
                                                         Accumulator.class,
                                                         DataTypes.FIELD(
                                                                 "value",

            callContext.getArgumentDataTypes().get(0)),
                                                         DataTypes.FIELD("date",
            DataTypes.DATE()));
                                         return Optional.of(accDataType);
                                     })
                             .outputTypeStrategy(
                                     callContext -> {
                                         DataType argDataType =
            callContext.getArgumentDataTypes().get(0);
                                         DataType outputDataType =
                                                 DataTypes.ROW(
                                                         DataTypes.FIELD("value",
            argDataType),
                                                         DataTypes.FIELD("date",
            DataTypes.DATE()));
                                         return Optional.of(outputDataType);
                                     })
                             .build();
                 }
            }

            Regards,
            Timo



            On 20.01.21 01:04, Dylan Forciea wrote:
            > I am attempting to create an aggregate UDF that takes a generic
            > parameter T, but for the life of me, I can’t seem to get it to work.
            >
            > The UDF I’m trying to implement takes two input arguments, a value that
            > is generic, and a date. It will choose the non-null value with the
            > latest associated date. I had originally done this with separate Top 1
            > queries connected with a left join, but the memory usage seems far
            > higher than doing this with a custom aggregate function.
            >
            > As a first attempt, I tried to use custom type inference to have it
            > validate that the first argument type is the output type and have a
            > single function, and also used DataTypes.STRUCTURE to try to define the
            > shape of my accumulator. However, that resulted in an exception like
            > this whenever I tried to use a non-string value as the first argument:
            >
            > [error] Caused by: java.lang.ClassCastException: java.lang.Long cannot
            > be cast to java.lang.String
            >
            > [error]   at
            > io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
            > Source)
            >
            > [error]   at
            > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
            >
            > [error]   at
            > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
            >
            > [error]   at
            > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
            >
            > [error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)
            >
            > [error]   at
            > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
            >
            > [error]   at
            > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
            >
            > [error]   at
            > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
            >
            > [error]   at
            > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
            >
            > [error]   at
            > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
            >
            > [error]   at
            > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
            >
            > [error]   at
            > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
            >
            > [error]   at
            > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
            >
            > [error]   at
            > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
            >
            > [error]   at
            > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
            >
            > [error]   at
            > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
            >
            > [error]   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
            >
            > [error]   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
            >
            > [error]   at java.lang.Thread.run(Thread.java:748)
            >
            > Figuring that I can’t do something of that sort, I tried to follow the
            > general approach in the Sum accumulator[1] in the Flink source code
            > where separate classes are derived from a base class, and each
            > advertises its accumulator shape, but ended up with the exact same stack
            > trace as above when I tried to create and use a function specifically
            > for a non-string type like Long.
            >
            > Is there something I’m missing as far as how this is supposed to be
            > done? Everything I try either results in a stack track like the above,
            > or type erasure issues when trying to get type information for the
            > accumulator. If I just copy the generic code multiple times and just
            > directly use Long or String rather than using subclassing, then it works
            > just fine. I appreciate any help I can get on this!
            >
            > Regards,
            >
            > Dylan Forciea
            >
            > [1]
            > https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
            >




Reply | Threaded
Open this post in threaded view
|

Re: Trying to create a generic aggregate UDF

Dylan Forciea
Timo,

I converted what I had to Java, and ended up with the exact same issue as before where it will work if I only ever use it on 1 type, but not if I use it on multiple. Maybe this is a bug?

Dylan

On 1/20/21, 10:06 AM, "Dylan Forciea" <[hidden email]> wrote:

    Oh, I think I might have a clue as to what is going on. I notice that it will work properly when I only call it on Long. I think that it is using the same generated code for the Converter for whatever was called first.

    Since in Scala I can't declare an object as static within the class itself, I wonder if it won't generate appropriate Converter code per subtype. I tried creating a subclass that is specific to the type within my class and returning that as the accumulator, but that didn't help. And, I can't refer to that class in the TypeInference since it isn't static and I get an error from Flink because of that. I'm going to see if I just write this UDF in Java with an embedded public static class like you have if it will solve my problems. I'll report back to let you know what I find. If that works, I'm not quite sure how to make it work in Scala.

    Regards,
    Dylan Forciea

    On 1/20/21, 9:34 AM, "Dylan Forciea" <[hidden email]> wrote:

        As a side note, I also just tried to unify into a single function registration and used _ as the type parameter in the classOf calls there and within the TypeInference definition for the accumulator and still ended up with the exact same stack trace.

        Dylan

        On 1/20/21, 9:22 AM, "Dylan Forciea" <[hidden email]> wrote:

            Timo,

            I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What you proposed is roughly what I had come up with the first time around that resulted in the stack trace with the ClassCastException I had originally included. I saw that you had used a Row instead of just the value in our example, but changing it that way didn't seem to help, which makes sense since the problem seems to be in the code generated for the accumulator Converter and not the output.

            Here is the exact code that caused that error (while calling LatestNonNullLong):

            The registration of the below:
                env.createTemporarySystemFunction("LatestNonNullLong", classOf[LatestNonNull[Long]])
                env.createTemporarySystemFunction("LatestNonNullString", classOf[LatestNonNull[String]])


            The class itself:

            import java.time.LocalDate
            import java.util.Optional
            import org.apache.flink.table.api.DataTypes
            import org.apache.flink.table.catalog.DataTypeFactory
            import org.apache.flink.table.functions.AggregateFunction
            import org.apache.flink.table.types.inference.{InputTypeStrategies, TypeInference}

            case class LatestNonNullAccumulator[T](
                var value: T = null.asInstanceOf[T],
                var date: LocalDate = null)

            class LatestNonNull[T] extends AggregateFunction[T, LatestNonNullAccumulator[T]] {

              override def createAccumulator(): LatestNonNullAccumulator[T] = {
                LatestNonNullAccumulator[T]()
              }

              override def getValue(acc: LatestNonNullAccumulator[T]): T = {
                acc.value
              }

              def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: LocalDate): Unit = {
                if (value != null) {
                  Option(acc.date).fold {
                    acc.value = value
                    acc.date = date
                  } { accDate =>
                    if (date != null && date.isAfter(accDate)) {
                      acc.value = value
                      acc.date = date
                    }
                  }
                }
              }

              def merge(
                  acc: LatestNonNullAccumulator[T],
                  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
                val iter = it.iterator()
                while (iter.hasNext) {
                  val a = iter.next()
                  if (a.value != null) {
                    Option(acc.date).fold {
                      acc.value = a.value
                      acc.date = a.date
                    } { accDate =>
                      Option(a.date).map { curDate =>
                        if (curDate.isAfter(accDate)) {
                          acc.value = a.value
                          acc.date = a.date
                        }
                      }
                    }
                  }
                }
              }

              def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
                acc.value = null.asInstanceOf[T]
                acc.date = null
              }

              override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = {
                TypeInference
                  .newBuilder()
                  .inputTypeStrategy(InputTypeStrategies
                    .sequence(InputTypeStrategies.ANY, InputTypeStrategies.explicit(DataTypes.DATE())))
                  .accumulatorTypeStrategy { callContext =>
                    val accDataType = DataTypes.STRUCTURED(
                      classOf[LatestNonNullAccumulator[T]],
                      DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)),
                      DataTypes.FIELD("date", DataTypes.DATE()))

                    Optional.of(accDataType)
                  }
                  .outputTypeStrategy { callContext =>
                    val outputDataType = callContext.getArgumentDataTypes().get(0);
                    Optional.of(outputDataType);
                  }
                  .build()
              }
            }

            Regards,
            Dylan Forciea

            On 1/20/21, 2:37 AM, "Timo Walther" <[hidden email]> wrote:

                Hi Dylan,

                I'm assuming your are using Flink 1.12 and the Blink planner?

                Beginning from 1.12 you can use the "new" aggregate functions with a
                better type inference. So TypeInformation will not be used in this stack.

                I tried to come up with an example that should explain the rough design.
                I will include this example into the Flink code base. I hope this helps:



                import org.apache.flink.table.types.inference.InputTypeStrategies;

                public static class LastIfNotNull<T>
                         extends AggregateFunction<Row, LastIfNotNull.Accumulator<T>> {

                     public static class Accumulator<T> {
                         public T value;
                         public LocalDate date;
                     }

                     public void accumulate(Accumulator<T> acc, T input, LocalDate date) {
                         if (input != null) {
                             acc.value = input;
                             acc.date = date;
                         }
                     }

                     @Override
                     public Row getValue(Accumulator<T> acc) {
                         return Row.of(acc.value, acc.date);
                     }

                     @Override
                     public Accumulator<T> createAccumulator() {
                         return new Accumulator<>();
                     }

                     @Override
                     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
                         return TypeInference.newBuilder()
                                 .inputTypeStrategy(
                                         InputTypeStrategies.sequence(
                                                 InputTypeStrategies.ANY,

                InputTypeStrategies.explicit(DataTypes.DATE())))
                                 .accumulatorTypeStrategy(
                                         callContext -> {
                                             DataType accDataType =
                                                     DataTypes.STRUCTURED(
                                                             Accumulator.class,
                                                             DataTypes.FIELD(
                                                                     "value",

                callContext.getArgumentDataTypes().get(0)),
                                                             DataTypes.FIELD("date",
                DataTypes.DATE()));
                                             return Optional.of(accDataType);
                                         })
                                 .outputTypeStrategy(
                                         callContext -> {
                                             DataType argDataType =
                callContext.getArgumentDataTypes().get(0);
                                             DataType outputDataType =
                                                     DataTypes.ROW(
                                                             DataTypes.FIELD("value",
                argDataType),
                                                             DataTypes.FIELD("date",
                DataTypes.DATE()));
                                             return Optional.of(outputDataType);
                                         })
                                 .build();
                     }
                }

                Regards,
                Timo



                On 20.01.21 01:04, Dylan Forciea wrote:
                > I am attempting to create an aggregate UDF that takes a generic
                > parameter T, but for the life of me, I can’t seem to get it to work.
                >
                > The UDF I’m trying to implement takes two input arguments, a value that
                > is generic, and a date. It will choose the non-null value with the
                > latest associated date. I had originally done this with separate Top 1
                > queries connected with a left join, but the memory usage seems far
                > higher than doing this with a custom aggregate function.
                >
                > As a first attempt, I tried to use custom type inference to have it
                > validate that the first argument type is the output type and have a
                > single function, and also used DataTypes.STRUCTURE to try to define the
                > shape of my accumulator. However, that resulted in an exception like
                > this whenever I tried to use a non-string value as the first argument:
                >
                > [error] Caused by: java.lang.ClassCastException: java.lang.Long cannot
                > be cast to java.lang.String
                >
                > [error]   at
                > io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
                > Source)
                >
                > [error]   at
                > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
                >
                > [error]   at
                > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
                >
                > [error]   at
                > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
                >
                > [error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)
                >
                > [error]   at
                > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
                >
                > [error]   at
                > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
                >
                > [error]   at
                > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
                >
                > [error]   at
                > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
                >
                > [error]   at
                > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
                >
                > [error]   at
                > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
                >
                > [error]   at
                > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
                >
                > [error]   at
                > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
                >
                > [error]   at
                > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
                >
                > [error]   at
                > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
                >
                > [error]   at
                > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
                >
                > [error]   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
                >
                > [error]   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
                >
                > [error]   at java.lang.Thread.run(Thread.java:748)
                >
                > Figuring that I can’t do something of that sort, I tried to follow the
                > general approach in the Sum accumulator[1] in the Flink source code
                > where separate classes are derived from a base class, and each
                > advertises its accumulator shape, but ended up with the exact same stack
                > trace as above when I tried to create and use a function specifically
                > for a non-string type like Long.
                >
                > Is there something I’m missing as far as how this is supposed to be
                > done? Everything I try either results in a stack track like the above,
                > or type erasure issues when trying to get type information for the
                > accumulator. If I just copy the generic code multiple times and just
                > directly use Long or String rather than using subclassing, then it works
                > just fine. I appreciate any help I can get on this!
                >
                > Regards,
                >
                > Dylan Forciea
                >
                > [1]
                > https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
                >





Reply | Threaded
Open this post in threaded view
|

Re: Trying to create a generic aggregate UDF

Timo Walther
Hi Dylan,

thanks for the investigation. I can now also reproduce it my code. Yes,
this is a bug. I opened

https://issues.apache.org/jira/browse/FLINK-21070

and will try to fix this asap.

Thanks,
Timo

On 20.01.21 17:52, Dylan Forciea wrote:

> Timo,
>
> I converted what I had to Java, and ended up with the exact same issue as before where it will work if I only ever use it on 1 type, but not if I use it on multiple. Maybe this is a bug?
>
> Dylan
>
> On 1/20/21, 10:06 AM, "Dylan Forciea" <[hidden email]> wrote:
>
>      Oh, I think I might have a clue as to what is going on. I notice that it will work properly when I only call it on Long. I think that it is using the same generated code for the Converter for whatever was called first.
>
>      Since in Scala I can't declare an object as static within the class itself, I wonder if it won't generate appropriate Converter code per subtype. I tried creating a subclass that is specific to the type within my class and returning that as the accumulator, but that didn't help. And, I can't refer to that class in the TypeInference since it isn't static and I get an error from Flink because of that. I'm going to see if I just write this UDF in Java with an embedded public static class like you have if it will solve my problems. I'll report back to let you know what I find. If that works, I'm not quite sure how to make it work in Scala.
>
>      Regards,
>      Dylan Forciea
>
>      On 1/20/21, 9:34 AM, "Dylan Forciea" <[hidden email]> wrote:
>
>          As a side note, I also just tried to unify into a single function registration and used _ as the type parameter in the classOf calls there and within the TypeInference definition for the accumulator and still ended up with the exact same stack trace.
>
>          Dylan
>
>          On 1/20/21, 9:22 AM, "Dylan Forciea" <[hidden email]> wrote:
>
>              Timo,
>
>              I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What you proposed is roughly what I had come up with the first time around that resulted in the stack trace with the ClassCastException I had originally included. I saw that you had used a Row instead of just the value in our example, but changing it that way didn't seem to help, which makes sense since the problem seems to be in the code generated for the accumulator Converter and not the output.
>
>              Here is the exact code that caused that error (while calling LatestNonNullLong):
>
>              The registration of the below:
>                  env.createTemporarySystemFunction("LatestNonNullLong", classOf[LatestNonNull[Long]])
>                  env.createTemporarySystemFunction("LatestNonNullString", classOf[LatestNonNull[String]])
>
>
>              The class itself:
>
>              import java.time.LocalDate
>              import java.util.Optional
>              import org.apache.flink.table.api.DataTypes
>              import org.apache.flink.table.catalog.DataTypeFactory
>              import org.apache.flink.table.functions.AggregateFunction
>              import org.apache.flink.table.types.inference.{InputTypeStrategies, TypeInference}
>
>              case class LatestNonNullAccumulator[T](
>                  var value: T = null.asInstanceOf[T],
>                  var date: LocalDate = null)
>
>              class LatestNonNull[T] extends AggregateFunction[T, LatestNonNullAccumulator[T]] {
>
>                override def createAccumulator(): LatestNonNullAccumulator[T] = {
>                  LatestNonNullAccumulator[T]()
>                }
>
>                override def getValue(acc: LatestNonNullAccumulator[T]): T = {
>                  acc.value
>                }
>
>                def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: LocalDate): Unit = {
>                  if (value != null) {
>                    Option(acc.date).fold {
>                      acc.value = value
>                      acc.date = date
>                    } { accDate =>
>                      if (date != null && date.isAfter(accDate)) {
>                        acc.value = value
>                        acc.date = date
>                      }
>                    }
>                  }
>                }
>
>                def merge(
>                    acc: LatestNonNullAccumulator[T],
>                    it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
>                  val iter = it.iterator()
>                  while (iter.hasNext) {
>                    val a = iter.next()
>                    if (a.value != null) {
>                      Option(acc.date).fold {
>                        acc.value = a.value
>                        acc.date = a.date
>                      } { accDate =>
>                        Option(a.date).map { curDate =>
>                          if (curDate.isAfter(accDate)) {
>                            acc.value = a.value
>                            acc.date = a.date
>                          }
>                        }
>                      }
>                    }
>                  }
>                }
>
>                def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
>                  acc.value = null.asInstanceOf[T]
>                  acc.date = null
>                }
>
>                override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = {
>                  TypeInference
>                    .newBuilder()
>                    .inputTypeStrategy(InputTypeStrategies
>                      .sequence(InputTypeStrategies.ANY, InputTypeStrategies.explicit(DataTypes.DATE())))
>                    .accumulatorTypeStrategy { callContext =>
>                      val accDataType = DataTypes.STRUCTURED(
>                        classOf[LatestNonNullAccumulator[T]],
>                        DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)),
>                        DataTypes.FIELD("date", DataTypes.DATE()))
>
>                      Optional.of(accDataType)
>                    }
>                    .outputTypeStrategy { callContext =>
>                      val outputDataType = callContext.getArgumentDataTypes().get(0);
>                      Optional.of(outputDataType);
>                    }
>                    .build()
>                }
>              }
>
>              Regards,
>              Dylan Forciea
>
>              On 1/20/21, 2:37 AM, "Timo Walther" <[hidden email]> wrote:
>
>                  Hi Dylan,
>
>                  I'm assuming your are using Flink 1.12 and the Blink planner?
>
>                  Beginning from 1.12 you can use the "new" aggregate functions with a
>                  better type inference. So TypeInformation will not be used in this stack.
>
>                  I tried to come up with an example that should explain the rough design.
>                  I will include this example into the Flink code base. I hope this helps:
>
>
>
>                  import org.apache.flink.table.types.inference.InputTypeStrategies;
>
>                  public static class LastIfNotNull<T>
>                           extends AggregateFunction<Row, LastIfNotNull.Accumulator<T>> {
>
>                       public static class Accumulator<T> {
>                           public T value;
>                           public LocalDate date;
>                       }
>
>                       public void accumulate(Accumulator<T> acc, T input, LocalDate date) {
>                           if (input != null) {
>                               acc.value = input;
>                               acc.date = date;
>                           }
>                       }
>
>                       @Override
>                       public Row getValue(Accumulator<T> acc) {
>                           return Row.of(acc.value, acc.date);
>                       }
>
>                       @Override
>                       public Accumulator<T> createAccumulator() {
>                           return new Accumulator<>();
>                       }
>
>                       @Override
>                       public TypeInference getTypeInference(DataTypeFactory typeFactory) {
>                           return TypeInference.newBuilder()
>                                   .inputTypeStrategy(
>                                           InputTypeStrategies.sequence(
>                                                   InputTypeStrategies.ANY,
>
>                  InputTypeStrategies.explicit(DataTypes.DATE())))
>                                   .accumulatorTypeStrategy(
>                                           callContext -> {
>                                               DataType accDataType =
>                                                       DataTypes.STRUCTURED(
>                                                               Accumulator.class,
>                                                               DataTypes.FIELD(
>                                                                       "value",
>
>                  callContext.getArgumentDataTypes().get(0)),
>                                                               DataTypes.FIELD("date",
>                  DataTypes.DATE()));
>                                               return Optional.of(accDataType);
>                                           })
>                                   .outputTypeStrategy(
>                                           callContext -> {
>                                               DataType argDataType =
>                  callContext.getArgumentDataTypes().get(0);
>                                               DataType outputDataType =
>                                                       DataTypes.ROW(
>                                                               DataTypes.FIELD("value",
>                  argDataType),
>                                                               DataTypes.FIELD("date",
>                  DataTypes.DATE()));
>                                               return Optional.of(outputDataType);
>                                           })
>                                   .build();
>                       }
>                  }
>
>                  Regards,
>                  Timo
>
>
>
>                  On 20.01.21 01:04, Dylan Forciea wrote:
>                  > I am attempting to create an aggregate UDF that takes a generic
>                  > parameter T, but for the life of me, I can’t seem to get it to work.
>                  >
>                  > The UDF I’m trying to implement takes two input arguments, a value that
>                  > is generic, and a date. It will choose the non-null value with the
>                  > latest associated date. I had originally done this with separate Top 1
>                  > queries connected with a left join, but the memory usage seems far
>                  > higher than doing this with a custom aggregate function.
>                  >
>                  > As a first attempt, I tried to use custom type inference to have it
>                  > validate that the first argument type is the output type and have a
>                  > single function, and also used DataTypes.STRUCTURE to try to define the
>                  > shape of my accumulator. However, that resulted in an exception like
>                  > this whenever I tried to use a non-string value as the first argument:
>                  >
>                  > [error] Caused by: java.lang.ClassCastException: java.lang.Long cannot
>                  > be cast to java.lang.String
>                  >
>                  > [error]   at
>                  > io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
>                  > Source)
>                  >
>                  > [error]   at
>                  > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
>                  >
>                  > [error]   at
>                  > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
>                  >
>                  > [error]   at
>                  > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
>                  >
>                  > [error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)
>                  >
>                  > [error]   at
>                  > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
>                  >
>                  > [error]   at
>                  > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
>                  >
>                  > [error]   at
>                  > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>                  >
>                  > [error]   at
>                  > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
>                  >
>                  > [error]   at
>                  > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
>                  >
>                  > [error]   at
>                  > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
>                  >
>                  > [error]   at
>                  > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>                  >
>                  > [error]   at
>                  > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>                  >
>                  > [error]   at
>                  > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>                  >
>                  > [error]   at
>                  > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>                  >
>                  > [error]   at
>                  > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>                  >
>                  > [error]   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>                  >
>                  > [error]   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>                  >
>                  > [error]   at java.lang.Thread.run(Thread.java:748)
>                  >
>                  > Figuring that I can’t do something of that sort, I tried to follow the
>                  > general approach in the Sum accumulator[1] in the Flink source code
>                  > where separate classes are derived from a base class, and each
>                  > advertises its accumulator shape, but ended up with the exact same stack
>                  > trace as above when I tried to create and use a function specifically
>                  > for a non-string type like Long.
>                  >
>                  > Is there something I’m missing as far as how this is supposed to be
>                  > done? Everything I try either results in a stack track like the above,
>                  > or type erasure issues when trying to get type information for the
>                  > accumulator. If I just copy the generic code multiple times and just
>                  > directly use Long or String rather than using subclassing, then it works
>                  > just fine. I appreciate any help I can get on this!
>                  >
>                  > Regards,
>                  >
>                  > Dylan Forciea
>                  >
>                  > [1]
>                  > https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
>                  >
>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Trying to create a generic aggregate UDF

Timo Walther
I opened a PR. Feel free to try it out.

https://github.com/apache/flink/pull/14720

Btw:

 >> env.createTemporarySystemFunction("LatestNonNullLong",
 >> classOf[LatestNonNull[Long]])
 >>
 >> env.createTemporarySystemFunction("LatestNonNullString",
 >> classOf[LatestNonNull[String]])

don't make a difference. The generics will be type erased in bytecode
and only the class name matters.

Thanks,
Timo

On 21.01.21 11:36, Timo Walther wrote:

> Hi Dylan,
>
> thanks for the investigation. I can now also reproduce it my code. Yes,
> this is a bug. I opened
>
> https://issues.apache.org/jira/browse/FLINK-21070
>
> and will try to fix this asap.
>
> Thanks,
> Timo
>
> On 20.01.21 17:52, Dylan Forciea wrote:
>> Timo,
>>
>> I converted what I had to Java, and ended up with the exact same issue
>> as before where it will work if I only ever use it on 1 type, but not
>> if I use it on multiple. Maybe this is a bug?
>>
>> Dylan
>>
>> On 1/20/21, 10:06 AM, "Dylan Forciea" <[hidden email]> wrote:
>>
>>      Oh, I think I might have a clue as to what is going on. I notice
>> that it will work properly when I only call it on Long. I think that
>> it is using the same generated code for the Converter for whatever was
>> called first.
>>
>>      Since in Scala I can't declare an object as static within the
>> class itself, I wonder if it won't generate appropriate Converter code
>> per subtype. I tried creating a subclass that is specific to the type
>> within my class and returning that as the accumulator, but that didn't
>> help. And, I can't refer to that class in the TypeInference since it
>> isn't static and I get an error from Flink because of that. I'm going
>> to see if I just write this UDF in Java with an embedded public static
>> class like you have if it will solve my problems. I'll report back to
>> let you know what I find. If that works, I'm not quite sure how to
>> make it work in Scala.
>>
>>      Regards,
>>      Dylan Forciea
>>
>>      On 1/20/21, 9:34 AM, "Dylan Forciea" <[hidden email]> wrote:
>>
>>          As a side note, I also just tried to unify into a single
>> function registration and used _ as the type parameter in the classOf
>> calls there and within the TypeInference definition for the
>> accumulator and still ended up with the exact same stack trace.
>>
>>          Dylan
>>
>>          On 1/20/21, 9:22 AM, "Dylan Forciea" <[hidden email]> wrote:
>>
>>              Timo,
>>
>>              I appreciate it! I am using Flink 1.12.0 right now with
>> the Blink planner. What you proposed is roughly what I had come up
>> with the first time around that resulted in the stack trace with the
>> ClassCastException I had originally included. I saw that you had used
>> a Row instead of just the value in our example, but changing it that
>> way didn't seem to help, which makes sense since the problem seems to
>> be in the code generated for the accumulator Converter and not the
>> output.
>>
>>              Here is the exact code that caused that error (while
>> calling LatestNonNullLong):
>>
>>              The registration of the below:
>>                  
>> env.createTemporarySystemFunction("LatestNonNullLong",
>> classOf[LatestNonNull[Long]])
>>                  
>> env.createTemporarySystemFunction("LatestNonNullString",
>> classOf[LatestNonNull[String]])
>>
>>
>>              The class itself:
>>
>>              import java.time.LocalDate
>>              import java.util.Optional
>>              import org.apache.flink.table.api.DataTypes
>>              import org.apache.flink.table.catalog.DataTypeFactory
>>              import org.apache.flink.table.functions.AggregateFunction
>>              import
>> org.apache.flink.table.types.inference.{InputTypeStrategies,
>> TypeInference}
>>
>>              case class LatestNonNullAccumulator[T](
>>                  var value: T = null.asInstanceOf[T],
>>                  var date: LocalDate = null)
>>
>>              class LatestNonNull[T] extends AggregateFunction[T,
>> LatestNonNullAccumulator[T]] {
>>
>>                override def createAccumulator():
>> LatestNonNullAccumulator[T] = {
>>                  LatestNonNullAccumulator[T]()
>>                }
>>
>>                override def getValue(acc:
>> LatestNonNullAccumulator[T]): T = {
>>                  acc.value
>>                }
>>
>>                def accumulate(acc: LatestNonNullAccumulator[T], value:
>> T, date: LocalDate): Unit = {
>>                  if (value != null) {
>>                    Option(acc.date).fold {
>>                      acc.value = value
>>                      acc.date = date
>>                    } { accDate =>
>>                      if (date != null && date.isAfter(accDate)) {
>>                        acc.value = value
>>                        acc.date = date
>>                      }
>>                    }
>>                  }
>>                }
>>
>>                def merge(
>>                    acc: LatestNonNullAccumulator[T],
>>                    it:
>> java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
>>                  val iter = it.iterator()
>>                  while (iter.hasNext) {
>>                    val a = iter.next()
>>                    if (a.value != null) {
>>                      Option(acc.date).fold {
>>                        acc.value = a.value
>>                        acc.date = a.date
>>                      } { accDate =>
>>                        Option(a.date).map { curDate =>
>>                          if (curDate.isAfter(accDate)) {
>>                            acc.value = a.value
>>                            acc.date = a.date
>>                          }
>>                        }
>>                      }
>>                    }
>>                  }
>>                }
>>
>>                def resetAccumulator(acc: LatestNonNullAccumulator[T]):
>> Unit = {
>>                  acc.value = null.asInstanceOf[T]
>>                  acc.date = null
>>                }
>>
>>                override def getTypeInference(typeFactory:
>> DataTypeFactory): TypeInference = {
>>                  TypeInference
>>                    .newBuilder()
>>                    .inputTypeStrategy(InputTypeStrategies
>>                      .sequence(InputTypeStrategies.ANY,
>> InputTypeStrategies.explicit(DataTypes.DATE())))
>>                    .accumulatorTypeStrategy { callContext =>
>>                      val accDataType = DataTypes.STRUCTURED(
>>                        classOf[LatestNonNullAccumulator[T]],
>>                        DataTypes.FIELD("value",
>> callContext.getArgumentDataTypes.get(0)),
>>                        DataTypes.FIELD("date", DataTypes.DATE()))
>>
>>                      Optional.of(accDataType)
>>                    }
>>                    .outputTypeStrategy { callContext =>
>>                      val outputDataType =
>> callContext.getArgumentDataTypes().get(0);
>>                      Optional.of(outputDataType);
>>                    }
>>                    .build()
>>                }
>>              }
>>
>>              Regards,
>>              Dylan Forciea
>>
>>              On 1/20/21, 2:37 AM, "Timo Walther" <[hidden email]>
>> wrote:
>>
>>                  Hi Dylan,
>>
>>                  I'm assuming your are using Flink 1.12 and the Blink
>> planner?
>>
>>                  Beginning from 1.12 you can use the "new" aggregate
>> functions with a
>>                  better type inference. So TypeInformation will not be
>> used in this stack.
>>
>>                  I tried to come up with an example that should
>> explain the rough design.
>>                  I will include this example into the Flink code base.
>> I hope this helps:
>>
>>
>>
>>                  import
>> org.apache.flink.table.types.inference.InputTypeStrategies;
>>
>>                  public static class LastIfNotNull<T>
>>                           extends AggregateFunction<Row,
>> LastIfNotNull.Accumulator<T>> {
>>
>>                       public static class Accumulator<T> {
>>                           public T value;
>>                           public LocalDate date;
>>                       }
>>
>>                       public void accumulate(Accumulator<T> acc, T
>> input, LocalDate date) {
>>                           if (input != null) {
>>                               acc.value = input;
>>                               acc.date = date;
>>                           }
>>                       }
>>
>>                       @Override
>>                       public Row getValue(Accumulator<T> acc) {
>>                           return Row.of(acc.value, acc.date);
>>                       }
>>
>>                       @Override
>>                       public Accumulator<T> createAccumulator() {
>>                           return new Accumulator<>();
>>                       }
>>
>>                       @Override
>>                       public TypeInference
>> getTypeInference(DataTypeFactory typeFactory) {
>>                           return TypeInference.newBuilder()
>>                                   .inputTypeStrategy(
>>                                           InputTypeStrategies.sequence(
>>                                                  
>> InputTypeStrategies.ANY,
>>
>>                  InputTypeStrategies.explicit(DataTypes.DATE())))
>>                                   .accumulatorTypeStrategy(
>>                                           callContext -> {
>>                                               DataType accDataType =
>>                                                      
>> DataTypes.STRUCTURED(
>>                                                              
>> Accumulator.class,
>>                                                              
>> DataTypes.FIELD(
>>                                                                      
>> "value",
>>
>>                  callContext.getArgumentDataTypes().get(0)),
>>                                                              
>> DataTypes.FIELD("date",
>>                  DataTypes.DATE()));
>>                                               return
>> Optional.of(accDataType);
>>                                           })
>>                                   .outputTypeStrategy(
>>                                           callContext -> {
>>                                               DataType argDataType =
>>                  callContext.getArgumentDataTypes().get(0);
>>                                               DataType outputDataType =
>>                                                       DataTypes.ROW(
>>                                                              
>> DataTypes.FIELD("value",
>>                  argDataType),
>>                                                              
>> DataTypes.FIELD("date",
>>                  DataTypes.DATE()));
>>                                               return
>> Optional.of(outputDataType);
>>                                           })
>>                                   .build();
>>                       }
>>                  }
>>
>>                  Regards,
>>                  Timo
>>
>>
>>
>>                  On 20.01.21 01:04, Dylan Forciea wrote:
>>                  > I am attempting to create an aggregate UDF that
>> takes a generic
>>                  > parameter T, but for the life of me, I can’t seem
>> to get it to work.
>>                  >
>>                  > The UDF I’m trying to implement takes two input
>> arguments, a value that
>>                  > is generic, and a date. It will choose the non-null
>> value with the
>>                  > latest associated date. I had originally done this
>> with separate Top 1
>>                  > queries connected with a left join, but the memory
>> usage seems far
>>                  > higher than doing this with a custom aggregate
>> function.
>>                  >
>>                  > As a first attempt, I tried to use custom type
>> inference to have it
>>                  > validate that the first argument type is the output
>> type and have a
>>                  > single function, and also used DataTypes.STRUCTURE
>> to try to define the
>>                  > shape of my accumulator. However, that resulted in
>> an exception like
>>                  > this whenever I tried to use a non-string value as
>> the first argument:
>>                  >
>>                  > [error] Caused by: java.lang.ClassCastException:
>> java.lang.Long cannot
>>                  > be cast to java.lang.String
>>                  >
>>                  > [error]   at
>>                  >
>> io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
>>
>>                  > Source)
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
>>
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
>>
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
>>
>>                  >
>>                  > [error]   at
>> GroupAggsHandler$777.getAccumulators(Unknown Source)
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
>>
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
>>
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>>
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
>>
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
>>
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
>>
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>
>>                  >
>>                  > [error]   at
>>                  >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>
>>                  >
>>                  > [error]   at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>                  >
>>                  > [error]   at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>                  >
>>                  > [error]   at java.lang.Thread.run(Thread.java:748)
>>                  >
>>                  > Figuring that I can’t do something of that sort, I
>> tried to follow the
>>                  > general approach in the Sum accumulator[1] in the
>> Flink source code
>>                  > where separate classes are derived from a base
>> class, and each
>>                  > advertises its accumulator shape, but ended up with
>> the exact same stack
>>                  > trace as above when I tried to create and use a
>> function specifically
>>                  > for a non-string type like Long.
>>                  >
>>                  > Is there something I’m missing as far as how this
>> is supposed to be
>>                  > done? Everything I try either results in a stack
>> track like the above,
>>                  > or type erasure issues when trying to get type
>> information for the
>>                  > accumulator. If I just copy the generic code
>> multiple times and just
>>                  > directly use Long or String rather than using
>> subclassing, then it works
>>                  > just fine. I appreciate any help I can get on this!
>>                  >
>>                  > Regards,
>>                  >
>>                  > Dylan Forciea
>>                  >
>>                  > [1]
>>                  >
>> https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala 
>>
>>                  >
>>
>>
>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Trying to create a generic aggregate UDF

Dylan Forciea
Timo,

Will do! I have been patching in a change locally that I have a PR [1] out for, so if this will end up in the next 1.12 patch release, I may add this in with it once it has been approved and merged.

On a side note, that PR has been out since the end of October (looks like I need to do a rebase to accommodate the code reformatting change that occurred since). Is there a process for getting somebody to review it? Not sure if with the New Year and the 1.12 release and follow-up if it just got lost in the commotion.

Regards,
Dylan Forciea

[1] https://github.com/apache/flink/pull/13787

On 1/21/21, 8:50 AM, "Timo Walther" <[hidden email]> wrote:

    I opened a PR. Feel free to try it out.

    https://github.com/apache/flink/pull/14720

    Btw:

     >> env.createTemporarySystemFunction("LatestNonNullLong",
     >> classOf[LatestNonNull[Long]])
     >>
     >> env.createTemporarySystemFunction("LatestNonNullString",
     >> classOf[LatestNonNull[String]])

    don't make a difference. The generics will be type erased in bytecode
    and only the class name matters.

    Thanks,
    Timo

    On 21.01.21 11:36, Timo Walther wrote:
    > Hi Dylan,
    >
    > thanks for the investigation. I can now also reproduce it my code. Yes,
    > this is a bug. I opened
    >
    > https://issues.apache.org/jira/browse/FLINK-21070
    >
    > and will try to fix this asap.
    >
    > Thanks,
    > Timo
    >
    > On 20.01.21 17:52, Dylan Forciea wrote:
    >> Timo,
    >>
    >> I converted what I had to Java, and ended up with the exact same issue
    >> as before where it will work if I only ever use it on 1 type, but not
    >> if I use it on multiple. Maybe this is a bug?
    >>
    >> Dylan
    >>
    >> On 1/20/21, 10:06 AM, "Dylan Forciea" <[hidden email]> wrote:
    >>
    >>      Oh, I think I might have a clue as to what is going on. I notice
    >> that it will work properly when I only call it on Long. I think that
    >> it is using the same generated code for the Converter for whatever was
    >> called first.
    >>
    >>      Since in Scala I can't declare an object as static within the
    >> class itself, I wonder if it won't generate appropriate Converter code
    >> per subtype. I tried creating a subclass that is specific to the type
    >> within my class and returning that as the accumulator, but that didn't
    >> help. And, I can't refer to that class in the TypeInference since it
    >> isn't static and I get an error from Flink because of that. I'm going
    >> to see if I just write this UDF in Java with an embedded public static
    >> class like you have if it will solve my problems. I'll report back to
    >> let you know what I find. If that works, I'm not quite sure how to
    >> make it work in Scala.
    >>
    >>      Regards,
    >>      Dylan Forciea
    >>
    >>      On 1/20/21, 9:34 AM, "Dylan Forciea" <[hidden email]> wrote:
    >>
    >>          As a side note, I also just tried to unify into a single
    >> function registration and used _ as the type parameter in the classOf
    >> calls there and within the TypeInference definition for the
    >> accumulator and still ended up with the exact same stack trace.
    >>
    >>          Dylan
    >>
    >>          On 1/20/21, 9:22 AM, "Dylan Forciea" <[hidden email]> wrote:
    >>
    >>              Timo,
    >>
    >>              I appreciate it! I am using Flink 1.12.0 right now with
    >> the Blink planner. What you proposed is roughly what I had come up
    >> with the first time around that resulted in the stack trace with the
    >> ClassCastException I had originally included. I saw that you had used
    >> a Row instead of just the value in our example, but changing it that
    >> way didn't seem to help, which makes sense since the problem seems to
    >> be in the code generated for the accumulator Converter and not the
    >> output.
    >>
    >>              Here is the exact code that caused that error (while
    >> calling LatestNonNullLong):
    >>
    >>              The registration of the below:
    >>                  
    >> env.createTemporarySystemFunction("LatestNonNullLong",
    >> classOf[LatestNonNull[Long]])
    >>                  
    >> env.createTemporarySystemFunction("LatestNonNullString",
    >> classOf[LatestNonNull[String]])
    >>
    >>
    >>              The class itself:
    >>
    >>              import java.time.LocalDate
    >>              import java.util.Optional
    >>              import org.apache.flink.table.api.DataTypes
    >>              import org.apache.flink.table.catalog.DataTypeFactory
    >>              import org.apache.flink.table.functions.AggregateFunction
    >>              import
    >> org.apache.flink.table.types.inference.{InputTypeStrategies,
    >> TypeInference}
    >>
    >>              case class LatestNonNullAccumulator[T](
    >>                  var value: T = null.asInstanceOf[T],
    >>                  var date: LocalDate = null)
    >>
    >>              class LatestNonNull[T] extends AggregateFunction[T,
    >> LatestNonNullAccumulator[T]] {
    >>
    >>                override def createAccumulator():
    >> LatestNonNullAccumulator[T] = {
    >>                  LatestNonNullAccumulator[T]()
    >>                }
    >>
    >>                override def getValue(acc:
    >> LatestNonNullAccumulator[T]): T = {
    >>                  acc.value
    >>                }
    >>
    >>                def accumulate(acc: LatestNonNullAccumulator[T], value:
    >> T, date: LocalDate): Unit = {
    >>                  if (value != null) {
    >>                    Option(acc.date).fold {
    >>                      acc.value = value
    >>                      acc.date = date
    >>                    } { accDate =>
    >>                      if (date != null && date.isAfter(accDate)) {
    >>                        acc.value = value
    >>                        acc.date = date
    >>                      }
    >>                    }
    >>                  }
    >>                }
    >>
    >>                def merge(
    >>                    acc: LatestNonNullAccumulator[T],
    >>                    it:
    >> java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
    >>                  val iter = it.iterator()
    >>                  while (iter.hasNext) {
    >>                    val a = iter.next()
    >>                    if (a.value != null) {
    >>                      Option(acc.date).fold {
    >>                        acc.value = a.value
    >>                        acc.date = a.date
    >>                      } { accDate =>
    >>                        Option(a.date).map { curDate =>
    >>                          if (curDate.isAfter(accDate)) {
    >>                            acc.value = a.value
    >>                            acc.date = a.date
    >>                          }
    >>                        }
    >>                      }
    >>                    }
    >>                  }
    >>                }
    >>
    >>                def resetAccumulator(acc: LatestNonNullAccumulator[T]):
    >> Unit = {
    >>                  acc.value = null.asInstanceOf[T]
    >>                  acc.date = null
    >>                }
    >>
    >>                override def getTypeInference(typeFactory:
    >> DataTypeFactory): TypeInference = {
    >>                  TypeInference
    >>                    .newBuilder()
    >>                    .inputTypeStrategy(InputTypeStrategies
    >>                      .sequence(InputTypeStrategies.ANY,
    >> InputTypeStrategies.explicit(DataTypes.DATE())))
    >>                    .accumulatorTypeStrategy { callContext =>
    >>                      val accDataType = DataTypes.STRUCTURED(
    >>                        classOf[LatestNonNullAccumulator[T]],
    >>                        DataTypes.FIELD("value",
    >> callContext.getArgumentDataTypes.get(0)),
    >>                        DataTypes.FIELD("date", DataTypes.DATE()))
    >>
    >>                      Optional.of(accDataType)
    >>                    }
    >>                    .outputTypeStrategy { callContext =>
    >>                      val outputDataType =
    >> callContext.getArgumentDataTypes().get(0);
    >>                      Optional.of(outputDataType);
    >>                    }
    >>                    .build()
    >>                }
    >>              }
    >>
    >>              Regards,
    >>              Dylan Forciea
    >>
    >>              On 1/20/21, 2:37 AM, "Timo Walther" <[hidden email]>
    >> wrote:
    >>
    >>                  Hi Dylan,
    >>
    >>                  I'm assuming your are using Flink 1.12 and the Blink
    >> planner?
    >>
    >>                  Beginning from 1.12 you can use the "new" aggregate
    >> functions with a
    >>                  better type inference. So TypeInformation will not be
    >> used in this stack.
    >>
    >>                  I tried to come up with an example that should
    >> explain the rough design.
    >>                  I will include this example into the Flink code base.
    >> I hope this helps:
    >>
    >>
    >>
    >>                  import
    >> org.apache.flink.table.types.inference.InputTypeStrategies;
    >>
    >>                  public static class LastIfNotNull<T>
    >>                           extends AggregateFunction<Row,
    >> LastIfNotNull.Accumulator<T>> {
    >>
    >>                       public static class Accumulator<T> {
    >>                           public T value;
    >>                           public LocalDate date;
    >>                       }
    >>
    >>                       public void accumulate(Accumulator<T> acc, T
    >> input, LocalDate date) {
    >>                           if (input != null) {
    >>                               acc.value = input;
    >>                               acc.date = date;
    >>                           }
    >>                       }
    >>
    >>                       @Override
    >>                       public Row getValue(Accumulator<T> acc) {
    >>                           return Row.of(acc.value, acc.date);
    >>                       }
    >>
    >>                       @Override
    >>                       public Accumulator<T> createAccumulator() {
    >>                           return new Accumulator<>();
    >>                       }
    >>
    >>                       @Override
    >>                       public TypeInference
    >> getTypeInference(DataTypeFactory typeFactory) {
    >>                           return TypeInference.newBuilder()
    >>                                   .inputTypeStrategy(
    >>                                           InputTypeStrategies.sequence(
    >>                                                  
    >> InputTypeStrategies.ANY,
    >>
    >>                  InputTypeStrategies.explicit(DataTypes.DATE())))
    >>                                   .accumulatorTypeStrategy(
    >>                                           callContext -> {
    >>                                               DataType accDataType =
    >>                                                      
    >> DataTypes.STRUCTURED(
    >>                                                              
    >> Accumulator.class,
    >>                                                              
    >> DataTypes.FIELD(
    >>                                                                      
    >> "value",
    >>
    >>                  callContext.getArgumentDataTypes().get(0)),
    >>                                                              
    >> DataTypes.FIELD("date",
    >>                  DataTypes.DATE()));
    >>                                               return
    >> Optional.of(accDataType);
    >>                                           })
    >>                                   .outputTypeStrategy(
    >>                                           callContext -> {
    >>                                               DataType argDataType =
    >>                  callContext.getArgumentDataTypes().get(0);
    >>                                               DataType outputDataType =
    >>                                                       DataTypes.ROW(
    >>                                                              
    >> DataTypes.FIELD("value",
    >>                  argDataType),
    >>                                                              
    >> DataTypes.FIELD("date",
    >>                  DataTypes.DATE()));
    >>                                               return
    >> Optional.of(outputDataType);
    >>                                           })
    >>                                   .build();
    >>                       }
    >>                  }
    >>
    >>                  Regards,
    >>                  Timo
    >>
    >>
    >>
    >>                  On 20.01.21 01:04, Dylan Forciea wrote:
    >>                  > I am attempting to create an aggregate UDF that
    >> takes a generic
    >>                  > parameter T, but for the life of me, I can’t seem
    >> to get it to work.
    >>                  >
    >>                  > The UDF I’m trying to implement takes two input
    >> arguments, a value that
    >>                  > is generic, and a date. It will choose the non-null
    >> value with the
    >>                  > latest associated date. I had originally done this
    >> with separate Top 1
    >>                  > queries connected with a left join, but the memory
    >> usage seems far
    >>                  > higher than doing this with a custom aggregate
    >> function.
    >>                  >
    >>                  > As a first attempt, I tried to use custom type
    >> inference to have it
    >>                  > validate that the first argument type is the output
    >> type and have a
    >>                  > single function, and also used DataTypes.STRUCTURE
    >> to try to define the
    >>                  > shape of my accumulator. However, that resulted in
    >> an exception like
    >>                  > this whenever I tried to use a non-string value as
    >> the first argument:
    >>                  >
    >>                  > [error] Caused by: java.lang.ClassCastException:
    >> java.lang.Long cannot
    >>                  > be cast to java.lang.String
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
    >>
    >>                  > Source)
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
    >>
    >>                  >
    >>                  > [error]   at
    >> GroupAggsHandler$777.getAccumulators(Unknown Source)
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    >>
    >>                  >
    >>                  > [error]   at
    >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    >>                  >
    >>                  > [error]   at
    >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    >>                  >
    >>                  > [error]   at java.lang.Thread.run(Thread.java:748)
    >>                  >
    >>                  > Figuring that I can’t do something of that sort, I
    >> tried to follow the
    >>                  > general approach in the Sum accumulator[1] in the
    >> Flink source code
    >>                  > where separate classes are derived from a base
    >> class, and each
    >>                  > advertises its accumulator shape, but ended up with
    >> the exact same stack
    >>                  > trace as above when I tried to create and use a
    >> function specifically
    >>                  > for a non-string type like Long.
    >>                  >
    >>                  > Is there something I’m missing as far as how this
    >> is supposed to be
    >>                  > done? Everything I try either results in a stack
    >> track like the above,
    >>                  > or type erasure issues when trying to get type
    >> information for the
    >>                  > accumulator. If I just copy the generic code
    >> multiple times and just
    >>                  > directly use Long or String rather than using
    >> subclassing, then it works
    >>                  > just fine. I appreciate any help I can get on this!
    >>                  >
    >>                  > Regards,
    >>                  >
    >>                  > Dylan Forciea
    >>                  >
    >>                  > [1]
    >>                  >
    >> https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala 
    >>
    >>                  >
    >>
    >>
    >>
    >>
    >>
    >


Reply | Threaded
Open this post in threaded view
|

Re: Trying to create a generic aggregate UDF

Timo Walther
Hi Dylan,

I can help with a review for your PR tomorrow. In general, I would
recommend to just ping people a couple of times that have been worked on
the component before (see git blame) to get a review. We are all busy
and need a bit of pushing from time to time ;-)

Thanks,
Timo

On 21.01.21 16:09, Dylan Forciea wrote:

> Timo,
>
> Will do! I have been patching in a change locally that I have a PR [1] out for, so if this will end up in the next 1.12 patch release, I may add this in with it once it has been approved and merged.
>
> On a side note, that PR has been out since the end of October (looks like I need to do a rebase to accommodate the code reformatting change that occurred since). Is there a process for getting somebody to review it? Not sure if with the New Year and the 1.12 release and follow-up if it just got lost in the commotion.
>
> Regards,
> Dylan Forciea
>
> [1] https://github.com/apache/flink/pull/13787
>
> On 1/21/21, 8:50 AM, "Timo Walther" <[hidden email]> wrote:
>
>      I opened a PR. Feel free to try it out.
>
>      https://github.com/apache/flink/pull/14720
>
>      Btw:
>
>       >> env.createTemporarySystemFunction("LatestNonNullLong",
>       >> classOf[LatestNonNull[Long]])
>       >>
>       >> env.createTemporarySystemFunction("LatestNonNullString",
>       >> classOf[LatestNonNull[String]])
>
>      don't make a difference. The generics will be type erased in bytecode
>      and only the class name matters.
>
>      Thanks,
>      Timo
>
>      On 21.01.21 11:36, Timo Walther wrote:
>      > Hi Dylan,
>      >
>      > thanks for the investigation. I can now also reproduce it my code. Yes,
>      > this is a bug. I opened
>      >
>      > https://issues.apache.org/jira/browse/FLINK-21070
>      >
>      > and will try to fix this asap.
>      >
>      > Thanks,
>      > Timo
>      >
>      > On 20.01.21 17:52, Dylan Forciea wrote:
>      >> Timo,
>      >>
>      >> I converted what I had to Java, and ended up with the exact same issue
>      >> as before where it will work if I only ever use it on 1 type, but not
>      >> if I use it on multiple. Maybe this is a bug?
>      >>
>      >> Dylan
>      >>
>      >> On 1/20/21, 10:06 AM, "Dylan Forciea" <[hidden email]> wrote:
>      >>
>      >>      Oh, I think I might have a clue as to what is going on. I notice
>      >> that it will work properly when I only call it on Long. I think that
>      >> it is using the same generated code for the Converter for whatever was
>      >> called first.
>      >>
>      >>      Since in Scala I can't declare an object as static within the
>      >> class itself, I wonder if it won't generate appropriate Converter code
>      >> per subtype. I tried creating a subclass that is specific to the type
>      >> within my class and returning that as the accumulator, but that didn't
>      >> help. And, I can't refer to that class in the TypeInference since it
>      >> isn't static and I get an error from Flink because of that. I'm going
>      >> to see if I just write this UDF in Java with an embedded public static
>      >> class like you have if it will solve my problems. I'll report back to
>      >> let you know what I find. If that works, I'm not quite sure how to
>      >> make it work in Scala.
>      >>
>      >>      Regards,
>      >>      Dylan Forciea
>      >>
>      >>      On 1/20/21, 9:34 AM, "Dylan Forciea" <[hidden email]> wrote:
>      >>
>      >>          As a side note, I also just tried to unify into a single
>      >> function registration and used _ as the type parameter in the classOf
>      >> calls there and within the TypeInference definition for the
>      >> accumulator and still ended up with the exact same stack trace.
>      >>
>      >>          Dylan
>      >>
>      >>          On 1/20/21, 9:22 AM, "Dylan Forciea" <[hidden email]> wrote:
>      >>
>      >>              Timo,
>      >>
>      >>              I appreciate it! I am using Flink 1.12.0 right now with
>      >> the Blink planner. What you proposed is roughly what I had come up
>      >> with the first time around that resulted in the stack trace with the
>      >> ClassCastException I had originally included. I saw that you had used
>      >> a Row instead of just the value in our example, but changing it that
>      >> way didn't seem to help, which makes sense since the problem seems to
>      >> be in the code generated for the accumulator Converter and not the
>      >> output.
>      >>
>      >>              Here is the exact code that caused that error (while
>      >> calling LatestNonNullLong):
>      >>
>      >>              The registration of the below:
>      >>
>      >> env.createTemporarySystemFunction("LatestNonNullLong",
>      >> classOf[LatestNonNull[Long]])
>      >>
>      >> env.createTemporarySystemFunction("LatestNonNullString",
>      >> classOf[LatestNonNull[String]])
>      >>
>      >>
>      >>              The class itself:
>      >>
>      >>              import java.time.LocalDate
>      >>              import java.util.Optional
>      >>              import org.apache.flink.table.api.DataTypes
>      >>              import org.apache.flink.table.catalog.DataTypeFactory
>      >>              import org.apache.flink.table.functions.AggregateFunction
>      >>              import
>      >> org.apache.flink.table.types.inference.{InputTypeStrategies,
>      >> TypeInference}
>      >>
>      >>              case class LatestNonNullAccumulator[T](
>      >>                  var value: T = null.asInstanceOf[T],
>      >>                  var date: LocalDate = null)
>      >>
>      >>              class LatestNonNull[T] extends AggregateFunction[T,
>      >> LatestNonNullAccumulator[T]] {
>      >>
>      >>                override def createAccumulator():
>      >> LatestNonNullAccumulator[T] = {
>      >>                  LatestNonNullAccumulator[T]()
>      >>                }
>      >>
>      >>                override def getValue(acc:
>      >> LatestNonNullAccumulator[T]): T = {
>      >>                  acc.value
>      >>                }
>      >>
>      >>                def accumulate(acc: LatestNonNullAccumulator[T], value:
>      >> T, date: LocalDate): Unit = {
>      >>                  if (value != null) {
>      >>                    Option(acc.date).fold {
>      >>                      acc.value = value
>      >>                      acc.date = date
>      >>                    } { accDate =>
>      >>                      if (date != null && date.isAfter(accDate)) {
>      >>                        acc.value = value
>      >>                        acc.date = date
>      >>                      }
>      >>                    }
>      >>                  }
>      >>                }
>      >>
>      >>                def merge(
>      >>                    acc: LatestNonNullAccumulator[T],
>      >>                    it:
>      >> java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
>      >>                  val iter = it.iterator()
>      >>                  while (iter.hasNext) {
>      >>                    val a = iter.next()
>      >>                    if (a.value != null) {
>      >>                      Option(acc.date).fold {
>      >>                        acc.value = a.value
>      >>                        acc.date = a.date
>      >>                      } { accDate =>
>      >>                        Option(a.date).map { curDate =>
>      >>                          if (curDate.isAfter(accDate)) {
>      >>                            acc.value = a.value
>      >>                            acc.date = a.date
>      >>                          }
>      >>                        }
>      >>                      }
>      >>                    }
>      >>                  }
>      >>                }
>      >>
>      >>                def resetAccumulator(acc: LatestNonNullAccumulator[T]):
>      >> Unit = {
>      >>                  acc.value = null.asInstanceOf[T]
>      >>                  acc.date = null
>      >>                }
>      >>
>      >>                override def getTypeInference(typeFactory:
>      >> DataTypeFactory): TypeInference = {
>      >>                  TypeInference
>      >>                    .newBuilder()
>      >>                    .inputTypeStrategy(InputTypeStrategies
>      >>                      .sequence(InputTypeStrategies.ANY,
>      >> InputTypeStrategies.explicit(DataTypes.DATE())))
>      >>                    .accumulatorTypeStrategy { callContext =>
>      >>                      val accDataType = DataTypes.STRUCTURED(
>      >>                        classOf[LatestNonNullAccumulator[T]],
>      >>                        DataTypes.FIELD("value",
>      >> callContext.getArgumentDataTypes.get(0)),
>      >>                        DataTypes.FIELD("date", DataTypes.DATE()))
>      >>
>      >>                      Optional.of(accDataType)
>      >>                    }
>      >>                    .outputTypeStrategy { callContext =>
>      >>                      val outputDataType =
>      >> callContext.getArgumentDataTypes().get(0);
>      >>                      Optional.of(outputDataType);
>      >>                    }
>      >>                    .build()
>      >>                }
>      >>              }
>      >>
>      >>              Regards,
>      >>              Dylan Forciea
>      >>
>      >>              On 1/20/21, 2:37 AM, "Timo Walther" <[hidden email]>
>      >> wrote:
>      >>
>      >>                  Hi Dylan,
>      >>
>      >>                  I'm assuming your are using Flink 1.12 and the Blink
>      >> planner?
>      >>
>      >>                  Beginning from 1.12 you can use the "new" aggregate
>      >> functions with a
>      >>                  better type inference. So TypeInformation will not be
>      >> used in this stack.
>      >>
>      >>                  I tried to come up with an example that should
>      >> explain the rough design.
>      >>                  I will include this example into the Flink code base.
>      >> I hope this helps:
>      >>
>      >>
>      >>
>      >>                  import
>      >> org.apache.flink.table.types.inference.InputTypeStrategies;
>      >>
>      >>                  public static class LastIfNotNull<T>
>      >>                           extends AggregateFunction<Row,
>      >> LastIfNotNull.Accumulator<T>> {
>      >>
>      >>                       public static class Accumulator<T> {
>      >>                           public T value;
>      >>                           public LocalDate date;
>      >>                       }
>      >>
>      >>                       public void accumulate(Accumulator<T> acc, T
>      >> input, LocalDate date) {
>      >>                           if (input != null) {
>      >>                               acc.value = input;
>      >>                               acc.date = date;
>      >>                           }
>      >>                       }
>      >>
>      >>                       @Override
>      >>                       public Row getValue(Accumulator<T> acc) {
>      >>                           return Row.of(acc.value, acc.date);
>      >>                       }
>      >>
>      >>                       @Override
>      >>                       public Accumulator<T> createAccumulator() {
>      >>                           return new Accumulator<>();
>      >>                       }
>      >>
>      >>                       @Override
>      >>                       public TypeInference
>      >> getTypeInference(DataTypeFactory typeFactory) {
>      >>                           return TypeInference.newBuilder()
>      >>                                   .inputTypeStrategy(
>      >>                                           InputTypeStrategies.sequence(
>      >>
>      >> InputTypeStrategies.ANY,
>      >>
>      >>                  InputTypeStrategies.explicit(DataTypes.DATE())))
>      >>                                   .accumulatorTypeStrategy(
>      >>                                           callContext -> {
>      >>                                               DataType accDataType =
>      >>
>      >> DataTypes.STRUCTURED(
>      >>
>      >> Accumulator.class,
>      >>
>      >> DataTypes.FIELD(
>      >>
>      >> "value",
>      >>
>      >>                  callContext.getArgumentDataTypes().get(0)),
>      >>
>      >> DataTypes.FIELD("date",
>      >>                  DataTypes.DATE()));
>      >>                                               return
>      >> Optional.of(accDataType);
>      >>                                           })
>      >>                                   .outputTypeStrategy(
>      >>                                           callContext -> {
>      >>                                               DataType argDataType =
>      >>                  callContext.getArgumentDataTypes().get(0);
>      >>                                               DataType outputDataType =
>      >>                                                       DataTypes.ROW(
>      >>
>      >> DataTypes.FIELD("value",
>      >>                  argDataType),
>      >>
>      >> DataTypes.FIELD("date",
>      >>                  DataTypes.DATE()));
>      >>                                               return
>      >> Optional.of(outputDataType);
>      >>                                           })
>      >>                                   .build();
>      >>                       }
>      >>                  }
>      >>
>      >>                  Regards,
>      >>                  Timo
>      >>
>      >>
>      >>
>      >>                  On 20.01.21 01:04, Dylan Forciea wrote:
>      >>                  > I am attempting to create an aggregate UDF that
>      >> takes a generic
>      >>                  > parameter T, but for the life of me, I can’t seem
>      >> to get it to work.
>      >>                  >
>      >>                  > The UDF I’m trying to implement takes two input
>      >> arguments, a value that
>      >>                  > is generic, and a date. It will choose the non-null
>      >> value with the
>      >>                  > latest associated date. I had originally done this
>      >> with separate Top 1
>      >>                  > queries connected with a left join, but the memory
>      >> usage seems far
>      >>                  > higher than doing this with a custom aggregate
>      >> function.
>      >>                  >
>      >>                  > As a first attempt, I tried to use custom type
>      >> inference to have it
>      >>                  > validate that the first argument type is the output
>      >> type and have a
>      >>                  > single function, and also used DataTypes.STRUCTURE
>      >> to try to define the
>      >>                  > shape of my accumulator. However, that resulted in
>      >> an exception like
>      >>                  > this whenever I tried to use a non-string value as
>      >> the first argument:
>      >>                  >
>      >>                  > [error] Caused by: java.lang.ClassCastException:
>      >> java.lang.Long cannot
>      >>                  > be cast to java.lang.String
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
>      >>
>      >>                  > Source)
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >> GroupAggsHandler$777.getAccumulators(Unknown Source)
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >>                  >
>      >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>      >>
>      >>                  >
>      >>                  > [error]   at
>      >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>      >>                  >
>      >>                  > [error]   at
>      >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>      >>                  >
>      >>                  > [error]   at java.lang.Thread.run(Thread.java:748)
>      >>                  >
>      >>                  > Figuring that I can’t do something of that sort, I
>      >> tried to follow the
>      >>                  > general approach in the Sum accumulator[1] in the
>      >> Flink source code
>      >>                  > where separate classes are derived from a base
>      >> class, and each
>      >>                  > advertises its accumulator shape, but ended up with
>      >> the exact same stack
>      >>                  > trace as above when I tried to create and use a
>      >> function specifically
>      >>                  > for a non-string type like Long.
>      >>                  >
>      >>                  > Is there something I’m missing as far as how this
>      >> is supposed to be
>      >>                  > done? Everything I try either results in a stack
>      >> track like the above,
>      >>                  > or type erasure issues when trying to get type
>      >> information for the
>      >>                  > accumulator. If I just copy the generic code
>      >> multiple times and just
>      >>                  > directly use Long or String rather than using
>      >> subclassing, then it works
>      >>                  > just fine. I appreciate any help I can get on this!
>      >>                  >
>      >>                  > Regards,
>      >>                  >
>      >>                  > Dylan Forciea
>      >>                  >
>      >>                  > [1]
>      >>                  >
>      >> https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
>      >>
>      >>                  >
>      >>
>      >>
>      >>
>      >>
>      >>
>      >
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Trying to create a generic aggregate UDF

Dylan Forciea
In reply to this post by Timo Walther
I wanted to report that I tried out your PR, and it does solve my issue. I am able to create a generic LatestNonNull and it appears to do what is expected.

Thanks,
Dylan Forciea

On 1/21/21, 8:50 AM, "Timo Walther" <[hidden email]> wrote:

    I opened a PR. Feel free to try it out.

    https://github.com/apache/flink/pull/14720

    Btw:

     >> env.createTemporarySystemFunction("LatestNonNullLong",
     >> classOf[LatestNonNull[Long]])
     >>
     >> env.createTemporarySystemFunction("LatestNonNullString",
     >> classOf[LatestNonNull[String]])

    don't make a difference. The generics will be type erased in bytecode
    and only the class name matters.

    Thanks,
    Timo

    On 21.01.21 11:36, Timo Walther wrote:
    > Hi Dylan,
    >
    > thanks for the investigation. I can now also reproduce it my code. Yes,
    > this is a bug. I opened
    >
    > https://issues.apache.org/jira/browse/FLINK-21070
    >
    > and will try to fix this asap.
    >
    > Thanks,
    > Timo
    >
    > On 20.01.21 17:52, Dylan Forciea wrote:
    >> Timo,
    >>
    >> I converted what I had to Java, and ended up with the exact same issue
    >> as before where it will work if I only ever use it on 1 type, but not
    >> if I use it on multiple. Maybe this is a bug?
    >>
    >> Dylan
    >>
    >> On 1/20/21, 10:06 AM, "Dylan Forciea" <[hidden email]> wrote:
    >>
    >>      Oh, I think I might have a clue as to what is going on. I notice
    >> that it will work properly when I only call it on Long. I think that
    >> it is using the same generated code for the Converter for whatever was
    >> called first.
    >>
    >>      Since in Scala I can't declare an object as static within the
    >> class itself, I wonder if it won't generate appropriate Converter code
    >> per subtype. I tried creating a subclass that is specific to the type
    >> within my class and returning that as the accumulator, but that didn't
    >> help. And, I can't refer to that class in the TypeInference since it
    >> isn't static and I get an error from Flink because of that. I'm going
    >> to see if I just write this UDF in Java with an embedded public static
    >> class like you have if it will solve my problems. I'll report back to
    >> let you know what I find. If that works, I'm not quite sure how to
    >> make it work in Scala.
    >>
    >>      Regards,
    >>      Dylan Forciea
    >>
    >>      On 1/20/21, 9:34 AM, "Dylan Forciea" <[hidden email]> wrote:
    >>
    >>          As a side note, I also just tried to unify into a single
    >> function registration and used _ as the type parameter in the classOf
    >> calls there and within the TypeInference definition for the
    >> accumulator and still ended up with the exact same stack trace.
    >>
    >>          Dylan
    >>
    >>          On 1/20/21, 9:22 AM, "Dylan Forciea" <[hidden email]> wrote:
    >>
    >>              Timo,
    >>
    >>              I appreciate it! I am using Flink 1.12.0 right now with
    >> the Blink planner. What you proposed is roughly what I had come up
    >> with the first time around that resulted in the stack trace with the
    >> ClassCastException I had originally included. I saw that you had used
    >> a Row instead of just the value in our example, but changing it that
    >> way didn't seem to help, which makes sense since the problem seems to
    >> be in the code generated for the accumulator Converter and not the
    >> output.
    >>
    >>              Here is the exact code that caused that error (while
    >> calling LatestNonNullLong):
    >>
    >>              The registration of the below:
    >>                  
    >> env.createTemporarySystemFunction("LatestNonNullLong",
    >> classOf[LatestNonNull[Long]])
    >>                  
    >> env.createTemporarySystemFunction("LatestNonNullString",
    >> classOf[LatestNonNull[String]])
    >>
    >>
    >>              The class itself:
    >>
    >>              import java.time.LocalDate
    >>              import java.util.Optional
    >>              import org.apache.flink.table.api.DataTypes
    >>              import org.apache.flink.table.catalog.DataTypeFactory
    >>              import org.apache.flink.table.functions.AggregateFunction
    >>              import
    >> org.apache.flink.table.types.inference.{InputTypeStrategies,
    >> TypeInference}
    >>
    >>              case class LatestNonNullAccumulator[T](
    >>                  var value: T = null.asInstanceOf[T],
    >>                  var date: LocalDate = null)
    >>
    >>              class LatestNonNull[T] extends AggregateFunction[T,
    >> LatestNonNullAccumulator[T]] {
    >>
    >>                override def createAccumulator():
    >> LatestNonNullAccumulator[T] = {
    >>                  LatestNonNullAccumulator[T]()
    >>                }
    >>
    >>                override def getValue(acc:
    >> LatestNonNullAccumulator[T]): T = {
    >>                  acc.value
    >>                }
    >>
    >>                def accumulate(acc: LatestNonNullAccumulator[T], value:
    >> T, date: LocalDate): Unit = {
    >>                  if (value != null) {
    >>                    Option(acc.date).fold {
    >>                      acc.value = value
    >>                      acc.date = date
    >>                    } { accDate =>
    >>                      if (date != null && date.isAfter(accDate)) {
    >>                        acc.value = value
    >>                        acc.date = date
    >>                      }
    >>                    }
    >>                  }
    >>                }
    >>
    >>                def merge(
    >>                    acc: LatestNonNullAccumulator[T],
    >>                    it:
    >> java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
    >>                  val iter = it.iterator()
    >>                  while (iter.hasNext) {
    >>                    val a = iter.next()
    >>                    if (a.value != null) {
    >>                      Option(acc.date).fold {
    >>                        acc.value = a.value
    >>                        acc.date = a.date
    >>                      } { accDate =>
    >>                        Option(a.date).map { curDate =>
    >>                          if (curDate.isAfter(accDate)) {
    >>                            acc.value = a.value
    >>                            acc.date = a.date
    >>                          }
    >>                        }
    >>                      }
    >>                    }
    >>                  }
    >>                }
    >>
    >>                def resetAccumulator(acc: LatestNonNullAccumulator[T]):
    >> Unit = {
    >>                  acc.value = null.asInstanceOf[T]
    >>                  acc.date = null
    >>                }
    >>
    >>                override def getTypeInference(typeFactory:
    >> DataTypeFactory): TypeInference = {
    >>                  TypeInference
    >>                    .newBuilder()
    >>                    .inputTypeStrategy(InputTypeStrategies
    >>                      .sequence(InputTypeStrategies.ANY,
    >> InputTypeStrategies.explicit(DataTypes.DATE())))
    >>                    .accumulatorTypeStrategy { callContext =>
    >>                      val accDataType = DataTypes.STRUCTURED(
    >>                        classOf[LatestNonNullAccumulator[T]],
    >>                        DataTypes.FIELD("value",
    >> callContext.getArgumentDataTypes.get(0)),
    >>                        DataTypes.FIELD("date", DataTypes.DATE()))
    >>
    >>                      Optional.of(accDataType)
    >>                    }
    >>                    .outputTypeStrategy { callContext =>
    >>                      val outputDataType =
    >> callContext.getArgumentDataTypes().get(0);
    >>                      Optional.of(outputDataType);
    >>                    }
    >>                    .build()
    >>                }
    >>              }
    >>
    >>              Regards,
    >>              Dylan Forciea
    >>
    >>              On 1/20/21, 2:37 AM, "Timo Walther" <[hidden email]>
    >> wrote:
    >>
    >>                  Hi Dylan,
    >>
    >>                  I'm assuming your are using Flink 1.12 and the Blink
    >> planner?
    >>
    >>                  Beginning from 1.12 you can use the "new" aggregate
    >> functions with a
    >>                  better type inference. So TypeInformation will not be
    >> used in this stack.
    >>
    >>                  I tried to come up with an example that should
    >> explain the rough design.
    >>                  I will include this example into the Flink code base.
    >> I hope this helps:
    >>
    >>
    >>
    >>                  import
    >> org.apache.flink.table.types.inference.InputTypeStrategies;
    >>
    >>                  public static class LastIfNotNull<T>
    >>                           extends AggregateFunction<Row,
    >> LastIfNotNull.Accumulator<T>> {
    >>
    >>                       public static class Accumulator<T> {
    >>                           public T value;
    >>                           public LocalDate date;
    >>                       }
    >>
    >>                       public void accumulate(Accumulator<T> acc, T
    >> input, LocalDate date) {
    >>                           if (input != null) {
    >>                               acc.value = input;
    >>                               acc.date = date;
    >>                           }
    >>                       }
    >>
    >>                       @Override
    >>                       public Row getValue(Accumulator<T> acc) {
    >>                           return Row.of(acc.value, acc.date);
    >>                       }
    >>
    >>                       @Override
    >>                       public Accumulator<T> createAccumulator() {
    >>                           return new Accumulator<>();
    >>                       }
    >>
    >>                       @Override
    >>                       public TypeInference
    >> getTypeInference(DataTypeFactory typeFactory) {
    >>                           return TypeInference.newBuilder()
    >>                                   .inputTypeStrategy(
    >>                                           InputTypeStrategies.sequence(
    >>                                                  
    >> InputTypeStrategies.ANY,
    >>
    >>                  InputTypeStrategies.explicit(DataTypes.DATE())))
    >>                                   .accumulatorTypeStrategy(
    >>                                           callContext -> {
    >>                                               DataType accDataType =
    >>                                                      
    >> DataTypes.STRUCTURED(
    >>                                                              
    >> Accumulator.class,
    >>                                                              
    >> DataTypes.FIELD(
    >>                                                                      
    >> "value",
    >>
    >>                  callContext.getArgumentDataTypes().get(0)),
    >>                                                              
    >> DataTypes.FIELD("date",
    >>                  DataTypes.DATE()));
    >>                                               return
    >> Optional.of(accDataType);
    >>                                           })
    >>                                   .outputTypeStrategy(
    >>                                           callContext -> {
    >>                                               DataType argDataType =
    >>                  callContext.getArgumentDataTypes().get(0);
    >>                                               DataType outputDataType =
    >>                                                       DataTypes.ROW(
    >>                                                              
    >> DataTypes.FIELD("value",
    >>                  argDataType),
    >>                                                              
    >> DataTypes.FIELD("date",
    >>                  DataTypes.DATE()));
    >>                                               return
    >> Optional.of(outputDataType);
    >>                                           })
    >>                                   .build();
    >>                       }
    >>                  }
    >>
    >>                  Regards,
    >>                  Timo
    >>
    >>
    >>
    >>                  On 20.01.21 01:04, Dylan Forciea wrote:
    >>                  > I am attempting to create an aggregate UDF that
    >> takes a generic
    >>                  > parameter T, but for the life of me, I can’t seem
    >> to get it to work.
    >>                  >
    >>                  > The UDF I’m trying to implement takes two input
    >> arguments, a value that
    >>                  > is generic, and a date. It will choose the non-null
    >> value with the
    >>                  > latest associated date. I had originally done this
    >> with separate Top 1
    >>                  > queries connected with a left join, but the memory
    >> usage seems far
    >>                  > higher than doing this with a custom aggregate
    >> function.
    >>                  >
    >>                  > As a first attempt, I tried to use custom type
    >> inference to have it
    >>                  > validate that the first argument type is the output
    >> type and have a
    >>                  > single function, and also used DataTypes.STRUCTURE
    >> to try to define the
    >>                  > shape of my accumulator. However, that resulted in
    >> an exception like
    >>                  > this whenever I tried to use a non-string value as
    >> the first argument:
    >>                  >
    >>                  > [error] Caused by: java.lang.ClassCastException:
    >> java.lang.Long cannot
    >>                  > be cast to java.lang.String
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
    >>
    >>                  > Source)
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
    >>
    >>                  >
    >>                  > [error]   at
    >> GroupAggsHandler$777.getAccumulators(Unknown Source)
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    >>
    >>                  >
    >>                  > [error]   at
    >>                  >
    >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    >>
    >>                  >
    >>                  > [error]   at
    >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    >>                  >
    >>                  > [error]   at
    >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    >>                  >
    >>                  > [error]   at java.lang.Thread.run(Thread.java:748)
    >>                  >
    >>                  > Figuring that I can’t do something of that sort, I
    >> tried to follow the
    >>                  > general approach in the Sum accumulator[1] in the
    >> Flink source code
    >>                  > where separate classes are derived from a base
    >> class, and each
    >>                  > advertises its accumulator shape, but ended up with
    >> the exact same stack
    >>                  > trace as above when I tried to create and use a
    >> function specifically
    >>                  > for a non-string type like Long.
    >>                  >
    >>                  > Is there something I’m missing as far as how this
    >> is supposed to be
    >>                  > done? Everything I try either results in a stack
    >> track like the above,
    >>                  > or type erasure issues when trying to get type
    >> information for the
    >>                  > accumulator. If I just copy the generic code
    >> multiple times and just
    >>                  > directly use Long or String rather than using
    >> subclassing, then it works
    >>                  > just fine. I appreciate any help I can get on this!
    >>                  >
    >>                  > Regards,
    >>                  >
    >>                  > Dylan Forciea
    >>                  >
    >>                  > [1]
    >>                  >
    >> https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala 
    >>
    >>                  >
    >>
    >>
    >>
    >>
    >>
    >