queryLogicalType != sinkLogicalType when UDAF returns List<Integer>

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

queryLogicalType != sinkLogicalType when UDAF returns List<Integer>

Dongwon Kim-2
Hello,

I'm using Flink-1.11.2.

Let's assume that I want to store on a table the result of the following UDAF:
  public class Agg extends AggregateFunction<List<Integer>, List<Integer>> {
    @Override
    public List<Integer> createAccumulator() {
      return new LinkedList<>();
    }
    @Override
    public List<Integer> getValue(List<Integer> acc) {
      return acc;
    }
    public void accumulate(List<Integer> acc, int i) {
      acc.add(i);
    }
    @Override
    public TypeInformation<List<Integer>> getResultType() {
      return new ListTypeInfo<>(Integer.class);
    }
  }

The main program looks as follow:
public class TestMain {
  public static void main(String[] args) {
    EnvironmentSettings settings = EnvironmentSettings.newInstance()
      .inBatchMode()
      .build();
    TableEnvironment tEnv = TableEnvironment.create(settings);
    tEnv.executeSql(
      "CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() + "'"
    );
    Table t = tEnv.sqlQuery(
      "SELECT agg(c2)\n" +
        "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" +
        "GROUP BY c1"
    );
    tEnv.executeSql(
      "CREATE TABLE output (a ARRAY<INT>) WITH ('connector' = 'print')"
    );
    /**
     * root
     *  |-- EXPR$0: RAW('java.util.List', ?)
     */
    t.printSchema();
    t.executeInsert("output" );
  }
}

This program fails with the following exception:
Exception in thread "main" org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type.
at org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
at scala.Option.map(Option.scala:146)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
at my.TestMain.main(TestMain.java:62)

I found that two types do not match:
- queryLogicalType : ROW<`EXPR$0` RAW('java.util.List', ?)>
- sinkLogicalType : ROW<`a` ARRAY<INT>>

Why does the queryLogicalType contain 'RAW' instead of 'ARRAY'?
Is there no way for UDAF to return java.lang.List<T> and store it as ARRAY?

Thanks in advance,

Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: queryLogicalType != sinkLogicalType when UDAF returns List<Integer>

Timo Walther
Hi,

first of all we don't support ListTypeInfo in Table API. Therefore, it
is treated as a RAW type. The exception during exception creation is a
bug that should be fixed in future version. But the mismatch is valid:

ARRAY<INT> is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`.
Can you try this as the result type of your aggregate function.

Reagrds,
Timo


On 26.11.20 18:13, Dongwon Kim wrote:

> Hello,
>
> I'm using Flink-1.11.2.
>
> Let's assume that I want to store on a table the result of the following
> UDAF:
>
>        public class Agg extends AggregateFunction<List<Integer>,
>     List<Integer>> {
>          @Override
>          public List<Integer> createAccumulator() {
>            return new LinkedList<>();
>          }
>          @Override
>          public List<Integer> getValue(List<Integer> acc) {
>            return acc;
>          }
>          public void accumulate(List<Integer> acc, int i) {
>            acc.add(i);
>          }
>          @Override
>          public TypeInformation<List<Integer>> getResultType() {
>            return new ListTypeInfo<>(Integer.class);
>          }
>        }
>
>
> The main program looks as follow:
>
>     public class TestMain {
>        public static void main(String[] args) {
>          EnvironmentSettings settings = EnvironmentSettings.newInstance()
>            .inBatchMode()
>            .build();
>          TableEnvironment tEnv = TableEnvironment.create(settings);
>          tEnv.executeSql(
>            "CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() + "'"
>          );
>          Table t = tEnv.sqlQuery(
>            "SELECT agg(c2)\n" +
>              "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" +
>              "GROUP BY c1"
>          );
>          tEnv.executeSql(
>            "CREATE TABLE output (a ARRAY<INT>) WITH ('connector' = 'print')"
>          );
>          /**
>           * root
>           *  |-- EXPR$0: RAW('java.util.List', ?)
>           */
>          t.printSchema();
>          t.executeInsert("output" );
>        }
>     }
>
>
> This program fails with the following exception:
>
>     Exception in thread "main"
>     org.apache.flink.table.api.TableException: A raw type backed by type
>     information has no serializable string representation. It needs to
>     be resolved into a proper raw type.
>     at
>     org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
>     at
>     org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
>     at
>     org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
>     at
>     scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at
>     scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at
>     org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
>     at
>     org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
>     at
>     org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
>     at scala.Option.map(Option.scala:146)
>     at
>     org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>     at
>     org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>     at
>     org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>     at
>     scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at
>     scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at
>     org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>     at
>     org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>     at
>     org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>     at
>     org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
>     at
>     org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
>     at my.TestMain.main(TestMain.java:62)
>
>
> I found that two types do not match:
> - queryLogicalType : ROW<`EXPR$0` RAW('java.util.List', ?)>
> - sinkLogicalType : ROW<`a` ARRAY<INT>>
>
> Why does the queryLogicalType contain 'RAW' instead of 'ARRAY'?
> Is there no way for UDAF to return java.lang.List<T> and store it as ARRAY?
>
> Thanks in advance,
>
> Dongwon

Reply | Threaded
Open this post in threaded view
|

Re: queryLogicalType != sinkLogicalType when UDAF returns List<Integer>

Dongwon Kim-2
Hi Timo,

Okay, then the aggregate function should look like this:
 public static class Agg extends AggregateFunction<Integer[], ArrayList<Integer>> {
    @Override
    public ArrayList<Integer> createAccumulator() {
      return new ArrayList<>();
    }
    @Override
    public Integer[] getValue(ArrayList<Integer> acc) {
      return acc.toArray(new Integer[0]);
    }
    public void accumulate(ArrayList<Integer> acc, int i) {
      acc.add(i);
    }
    @Override
    public TypeInformation<Integer[]> getResultType() {
      return OBJECT_ARRAY(Types.INT);
    }
  }

Now the program outputs:
2> +I([1, 2]) 

Thanks, 

Dongwon 

On Fri, Nov 27, 2020 at 5:38 PM Timo Walther <[hidden email]> wrote:
Hi,

first of all we don't support ListTypeInfo in Table API. Therefore, it
is treated as a RAW type. The exception during exception creation is a
bug that should be fixed in future version. But the mismatch is valid:

ARRAY<INT> is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`.
Can you try this as the result type of your aggregate function.

Reagrds,
Timo


On 26.11.20 18:13, Dongwon Kim wrote:
> Hello,
>
> I'm using Flink-1.11.2.
>
> Let's assume that I want to store on a table the result of the following
> UDAF:
>
>        public class Agg extends AggregateFunction<List<Integer>,
>     List<Integer>> {
>          @Override
>          public List<Integer> createAccumulator() {
>            return new LinkedList<>();
>          }
>          @Override
>          public List<Integer> getValue(List<Integer> acc) {
>            return acc;
>          }
>          public void accumulate(List<Integer> acc, int i) {
>            acc.add(i);
>          }
>          @Override
>          public TypeInformation<List<Integer>> getResultType() {
>            return new ListTypeInfo<>(Integer.class);
>          }
>        }
>
>
> The main program looks as follow:
>
>     public class TestMain {
>        public static void main(String[] args) {
>          EnvironmentSettings settings = EnvironmentSettings.newInstance()
>            .inBatchMode()
>            .build();
>          TableEnvironment tEnv = TableEnvironment.create(settings);
>          tEnv.executeSql(
>            "CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() + "'"
>          );
>          Table t = tEnv.sqlQuery(
>            "SELECT agg(c2)\n" +
>              "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" +
>              "GROUP BY c1"
>          );
>          tEnv.executeSql(
>            "CREATE TABLE output (a ARRAY<INT>) WITH ('connector' = 'print')"
>          );
>          /**
>           * root
>           *  |-- EXPR$0: RAW('java.util.List', ?)
>           */
>          t.printSchema();
>          t.executeInsert("output" );
>        }
>     }
>
>
> This program fails with the following exception:
>
>     Exception in thread "main"
>     org.apache.flink.table.api.TableException: A raw type backed by type
>     information has no serializable string representation. It needs to
>     be resolved into a proper raw type.
>     at
>     org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
>     at
>     org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
>     at
>     org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
>     at
>     scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at
>     scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at
>     org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
>     at
>     org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
>     at
>     org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
>     at scala.Option.map(Option.scala:146)
>     at
>     org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>     at
>     org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>     at
>     org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>     at
>     scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at
>     scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at
>     org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>     at
>     org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>     at
>     org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>     at
>     org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
>     at
>     org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
>     at my.TestMain.main(TestMain.java:62)
>
>
> I found that two types do not match:
> - queryLogicalType : ROW<`EXPR$0` RAW('java.util.List', ?)>
> - sinkLogicalType : ROW<`a` ARRAY<INT>>
>
> Why does the queryLogicalType contain 'RAW' instead of 'ARRAY'?
> Is there no way for UDAF to return java.lang.List<T> and store it as ARRAY?
>
> Thanks in advance,
>
> Dongwon