Dear All,
I have a question about TableSource. I defined a TableSource By StreamTableSource,then register a table and execute a query.the sql as "select f0 from myTable". final,turn the result table to DataStream. The following error occurred in execution and how to solve? Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Incompatible types of expression and result type. at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:966) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:964) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) MyCode as follows: ====================================== public static void main(String[] args) throws Exception { package com.xiaoju.manhattan.fbi.data.calc.source; |
Hi,
I could found the problem in your implementation. The Table API program is correct. However, the DataStream program that you construct in your TableSource has a wrong type. When ever you use a Row type, you need to specify the type either by implementing ResultTypeQueryable or in your can by supplying the info in the second parameter. DataStream<Row> dataStream = execEnv.addSource(new SourceFunction<Row>() { @Override public void run(SourceContext<Row> ctx) throws Exception { } @Override public void cancel() { } }, Types.ROW(Types.STRING(),Types.STRING(),Types.STRING())); Otherwise your SourceFunction will have a generic black box type that can not be accessed by the Table API. Regards, Timo Am 10/23/17 um 1:01 PM schrieb 韩宁宁: StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment); tableEnvironment.registerTableSource("myTable",new MyTableSource()); String sql = "select f0 from myTable"; Table sqlResult = tableEnvironment.sql(sql); DataStream<Tuple2<Boolean,String>> result = tableEnvironment.toRetractStream(sqlResult,String.class); result.print(); environment.execute();
|
Free forum by Nabble | Edit this page |