Help with table UDF

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Help with table UDF

Flavio Pompermaier
Hi all,
I'm using Flink 1.3.1 and I'm trying to register an UDF but there's something wrong.
I always get the following exception:

java.lang.UnsupportedOperationException: org.apache.flink.table.expressions.TableFunctionCall cannot be transformed to RexNode
at org.apache.flink.table.expressions.Expression.toRexNode(Expression.scala:53)
at org.apache.flink.table.expressions.Alias.toRexNode(fieldExpression.scala:79)
at org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:94)
at org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:94)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.table.plan.logical.Project.construct(operators.scala:94)
at org.apache.flink.table.plan.logical.LogicalNode.toRelNode(LogicalNode.scala:77)
at org.apache.flink.table.api.Table.getRelNode(table.scala:94)
at 

-------------------------------------------------
This is my Program:

final ExecutionEnvironment env = DatalinksExecutionEnvironment.getExecutionEnv();
final BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
    
DataSet<Row> dataSet = null;
dataSet = env.fromElements("{\"test\":\"val\"}").map(new MapFunction<String, Row>() {

        @Override
        public Row map(String value) throws Exception {
          Row ret = new Row(1);
          ret.setField(0, value);
          return ret;
        }
      }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
    
tEnv.registerFunction("myFunc", new MyTableFunction());
Table test = tEnv.fromDataSet(dataSet, "field1");
 Table res = test.select("field1,myFunc(recon)");
 dataSet = tEnv.toDataSet(res, 
           new RowTypeInfo(
                   BasicTypeInfo.STRING_TYPE_INFO, 
                   BasicTypeInfo.STRING_TYPE_INFO));
  dataSet.print();


MyTableFunction is something like:

public class MyTableFunction extends TableFunction<String> {
  public String eval(String str) {
    return "XXX";
  }
}


What I'm doing wrong here?

Thanks in advance,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Help with table UDF

Fabian Hueske-2
Hi Flavio,

you're using the TableFunction not correctly. The documentation shows how to call it in a join() method.
But I agree, the error message should be better.

Best, Fabian


2017-08-31 18:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi all,
I'm using Flink 1.3.1 and I'm trying to register an UDF but there's something wrong.
I always get the following exception:

java.lang.UnsupportedOperationException: org.apache.flink.table.expressions.TableFunctionCall cannot be transformed to RexNode
at org.apache.flink.table.expressions.Expression.toRexNode(Expression.scala:53)
at org.apache.flink.table.expressions.Alias.toRexNode(fieldExpression.scala:79)
at org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:94)
at org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:94)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.table.plan.logical.Project.construct(operators.scala:94)
at org.apache.flink.table.plan.logical.LogicalNode.toRelNode(LogicalNode.scala:77)
at org.apache.flink.table.api.Table.getRelNode(table.scala:94)
at 

-------------------------------------------------
This is my Program:

final ExecutionEnvironment env = DatalinksExecutionEnvironment.getExecutionEnv();
final BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
    
DataSet<Row> dataSet = null;
dataSet = env.fromElements("{\"test\":\"val\"}").map(new MapFunction<String, Row>() {

        @Override
        public Row map(String value) throws Exception {
          Row ret = new Row(1);
          ret.setField(0, value);
          return ret;
        }
      }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
    
tEnv.registerFunction("myFunc", new MyTableFunction());
Table test = tEnv.fromDataSet(dataSet, "field1");
 Table res = test.select("field1,myFunc(recon)");
 dataSet = tEnv.toDataSet(res, 
           new RowTypeInfo(
                   BasicTypeInfo.STRING_TYPE_INFO, 
                   BasicTypeInfo.STRING_TYPE_INFO));
  dataSet.print();


MyTableFunction is something like:

public class MyTableFunction extends TableFunction<String> {
  public String eval(String str) {
    return "XXX";
  }
}


What I'm doing wrong here?

Thanks in advance,
Flavio