Hi all,
I'm using Flink 1.3.1 and I'm trying to register an UDF but there's something wrong. I always get the following exception: java.lang.UnsupportedOperationException: org.apache.flink.table.expressions.TableFunctionCall cannot be transformed to RexNode at org.apache.flink.table.expressions.Expression.toRexNode(Expression.scala:53) at org.apache.flink.table.expressions.Alias.toRexNode(fieldExpression.scala:79) at org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:94) at org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:94) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.flink.table.plan.logical.Project.construct(operators.scala:94) at org.apache.flink.table.plan.logical.LogicalNode.toRelNode(LogicalNode.scala:77) at org.apache.flink.table.api.Table.getRelNode(table.scala:94) at ------------------------------------------------- This is my Program: final ExecutionEnvironment env = DatalinksExecutionEnvironment.getExecutionEnv(); final BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); DataSet<Row> dataSet = null; dataSet = env.fromElements("{\"test\":\"val\"}").map(new MapFunction<String, Row>() { @Override public Row map(String value) throws Exception { Row ret = new Row(1); ret.setField(0, value); return ret; } }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO)); tEnv.registerFunction("myFunc", new MyTableFunction()); Table test = tEnv.fromDataSet(dataSet, "field1"); Table res = test.select("field1,myFunc(recon)"); dataSet = tEnv.toDataSet(res, new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); dataSet.print(); MyTableFunction is something like: public class MyTableFunction extends TableFunction<String> { public String eval(String str) { return "XXX"; } } What I'm doing wrong here? Thanks in advance, Flavio |
Hi Flavio, you're using the TableFunction not correctly. The documentation shows how to call it in a join() method. But I agree, the error message should be better.Best, Fabian 2017-08-31 18:53 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Free forum by Nabble | Edit this page |