Incompatible types of expression and result type.

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Incompatible types of expression and result type.

韩宁宁
Dear All,
    I have a question about  TableSource.
    I defined a TableSource By StreamTableSource,then register a table and execute a query.the sql as "select f0 from myTable". final,turn the result table to DataStream.
The following error occurred in execution and how to solve?
Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Incompatible types of expression and result type.
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:966)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:964)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    MyCode as follows:
    ======================================
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);

tableEnvironment.registerTableSource("myTable",new MyTableSource());
String sql = "select f0 from myTable";
Table sqlResult = tableEnvironment.sql(sql);

DataStream<Tuple2<Boolean,String>> result = tableEnvironment.toRetractStream(sqlResult,String.class);
result.print();

environment.execute();
}
package com.xiaoju.manhattan.fbi.data.calc.source;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

public class MyTableSource implements StreamTableSource<Row>{
@Override
public TypeInformation<Row> getReturnType() {
TypeInformation<Row> typeInformation = new RowTypeInfo(Types.STRING,Types.STRING,Types.STRING);//Types.STRING,Types.STRING,Types.STRING
return typeInformation;
}

@Override
public String explainSource() {
return "";
}

@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

DataStream<Row> dataStream = execEnv.addSource(new SourceFunction<Row>() {
private long count = 0L;
private volatile boolean isRunning = true;
private String str = "{\"ak\":\"av\",\"bk\":\"bv\",\"ck\":\"cv\"}";
@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (isRunning && count < 10){
synchronized (ctx.getCheckpointLock()){
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(str);
Row row = new Row(jsonNode.size()-1);
for(int i=0;i<jsonNode.size();i++){
row.setField(i,jsonNode.get(i));
}
ctx.collect(row);
count++;
}
}
}

@Override
public void cancel() {
isRunning = false;
}
});

return dataStream;
}
}
Reply | Threaded
Open this post in threaded view
|

Re: Incompatible types of expression and result type.

Timo Walther
Hi,

I could found the problem in your implementation. The Table API program is correct. However, the DataStream program that you construct in your TableSource has a wrong type. When ever you use a Row type, you need to specify the type either by implementing ResultTypeQueryable or in your can by supplying the info in the second parameter.


DataStream<Row> dataStream = execEnv.addSource(new SourceFunction<Row>() {
    @Override
    public void run(SourceContext<Row> ctx) throws Exception {
    }

    @Override
    public void cancel() {
    }
}, Types.ROW(Types.STRING(),Types.STRING(),Types.STRING()));

Otherwise your SourceFunction will have a generic black box type that can not be accessed by the Table API.

Regards,
Timo



Am 10/23/17 um 1:01 PM schrieb 韩宁宁:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);

    tableEnvironment.registerTableSource("myTable",new MyTableSource());
    String sql = "select f0 from myTable";
    Table sqlResult = tableEnvironment.sql(sql);

    DataStream<Tuple2<Boolean,String>> result = tableEnvironment.toRetractStream(sqlResult,String.class);
    result.print();

    environment.execute();