Hi, I tried writing a simple sql query with custom StreamTableSource and it fails with error: org.apache.flink.table.codegen.CodeGenException: Arity of result type does not match number of expressions.at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:940)at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:883)at org.apache.flink.table.plan.nodes.CommonScan$class.generatedConversionFunction(CommonScan.scala:57)at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.generatedConversionFunction(StreamTableSourceScan.scala:35)at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:48)at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.convertToInternalRow(StreamTableSourceScan.scala:35)at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:107) You can check the source code here: |
Sorry forgot to add the link: 2017-06-09 20:19 GMT+02:00 Dawid Wysakowicz <[hidden email]>:
|
Hi David,
I think the problem is that the type of the DataStream produced by the TableSource, does not match the type that is declared in the ` getReturnType()`. A `MapFunction<xxx, Row>` is always a generic type (because Row cannot be analyzed). A solution would be that the mapper implements `ResultTypeQueryable`. I agree that the error should be thrown earlier, not in the CodeGenerator. Can you create an issue for this? Btw the Table API supports nested types, it should work that the TableSource returns ` SongEvent`. Regards, Timo Am 09.06.17 um 20:19 schrieb Dawid Wysakowicz:
|
Thanks a lot Timo, after I added the ResultTypeQueryable interface to my mapper it worked. As for the SongEvent the reason I tried remapping it to Row is that it has an enum field on which I want to filter, so my first approach was to remap it in TableSource to String. What do you think should be the way to go in such case? After successfully producing DataStream[Row] I tried sth like: tEnv.toAppendStream(table)(TypeInformation.of(classOf[UserSongsStatistics])).print(); The class UserSongsStatistics is a pojo with fields named the same as expressions in SELECT clause. Is such a construct intended to work? Right now I get an error: org.apache.flink.table.api.TableException: The field types of physical and logical row types do not match.This is a bug and should not happen. Please file an issue. Is it really a bug? Anyway thanks for help. I will file a JIRA for the previous issue tomorrow. 2017-06-09 22:25 GMT+02:00 Timo Walther <[hidden email]>:
|
Free forum by Nabble | Edit this page |