Stream sql example

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Stream sql example

Dawid Wysakowicz
Hi,
I tried writing a simple sql query with custom StreamTableSource and it fails with error:

org.apache.flink.table.codegen.CodeGenException: Arity of result type does not match number of expressions.
at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:940)
at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:883)
at org.apache.flink.table.plan.nodes.CommonScan$class.generatedConversionFunction(CommonScan.scala:57)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.generatedConversionFunction(StreamTableSourceScan.scala:35)
at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:48)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.convertToInternalRow(StreamTableSourceScan.scala:35)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:107)

You can check the source code here:
 

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder

Reply | Threaded
Open this post in threaded view
|

Re: Stream sql example

Dawid Wysakowicz

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


2017-06-09 20:19 GMT+02:00 Dawid Wysakowicz <[hidden email]>:
Hi,
I tried writing a simple sql query with custom StreamTableSource and it fails with error:

org.apache.flink.table.codegen.CodeGenException: Arity of result type does not match number of expressions.
at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:940)
at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:883)
at org.apache.flink.table.plan.nodes.CommonScan$class.generatedConversionFunction(CommonScan.scala:57)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.generatedConversionFunction(StreamTableSourceScan.scala:35)
at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:48)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.convertToInternalRow(StreamTableSourceScan.scala:35)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:107)

You can check the source code here:
 

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


Reply | Threaded
Open this post in threaded view
|

Re: Stream sql example

Timo Walther
Hi David,

I think the problem is that the type of the DataStream produced by the TableSource, does not match the type that is declared in the ` getReturnType()`. A `MapFunction<xxx, Row>` is always a generic type (because Row cannot be analyzed). A solution would be that the mapper implements `ResultTypeQueryable`. I agree that the error should be thrown earlier, not in the CodeGenerator. Can you create an issue for this?

Btw the Table API supports nested types, it should work that the TableSource returns ` SongEvent`.

Regards,
Timo


Am 09.06.17 um 20:19 schrieb Dawid Wysakowicz:

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


2017-06-09 20:19 GMT+02:00 Dawid Wysakowicz <[hidden email]>:
Hi,
I tried writing a simple sql query with custom StreamTableSource and it fails with error:

org.apache.flink.table.codegen.CodeGenException: Arity of result type does not match number of expressions.
at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:940)
at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:883)
at org.apache.flink.table.plan.nodes.CommonScan$class.generatedConversionFunction(CommonScan.scala:57)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.generatedConversionFunction(StreamTableSourceScan.scala:35)
at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:48)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.convertToInternalRow(StreamTableSourceScan.scala:35)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:107)

You can check the source code here:
 

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder



Reply | Threaded
Open this post in threaded view
|

Re: Stream sql example

Dawid Wysakowicz
Thanks a lot Timo, after I added the ResultTypeQueryable interface to my mapper it worked. As for the SongEvent the reason I tried remapping it to Row is that it has an enum field on which I want to filter, so my first approach was to remap it in TableSource to String. What do you think should be the way to go in such case? 

After successfully producing DataStream[Row] I tried sth like:
tEnv.toAppendStream(table)(TypeInformation.of(classOf[UserSongsStatistics])).print();

The class UserSongsStatistics is a pojo with fields named the same as expressions in SELECT clause. Is such a construct intended to work? Right now I get an error:

org.apache.flink.table.api.TableException: The field types of physical and logical row types do not match.This is a bug and should not happen. Please file an issue. 

Is it really a bug?

Anyway thanks for help. I will file a JIRA for the previous issue tomorrow. 

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


2017-06-09 22:25 GMT+02:00 Timo Walther <[hidden email]>:
Hi David,

I think the problem is that the type of the DataStream produced by the TableSource, does not match the type that is declared in the ` getReturnType()`. A `MapFunction<xxx, Row>` is always a generic type (because Row cannot be analyzed). A solution would be that the mapper implements `ResultTypeQueryable`. I agree that the error should be thrown earlier, not in the CodeGenerator. Can you create an issue for this?

Btw the Table API supports nested types, it should work that the TableSource returns ` SongEvent`.

Regards,
Timo


Am 09.06.17 um 20:19 schrieb Dawid Wysakowicz:

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


2017-06-09 20:19 GMT+02:00 Dawid Wysakowicz <[hidden email]>:
Hi,
I tried writing a simple sql query with custom StreamTableSource and it fails with error:

org.apache.flink.table.codegen.CodeGenException: Arity of result type does not match number of expressions.
at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:940)
at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:883)
at org.apache.flink.table.plan.nodes.CommonScan$class.generatedConversionFunction(CommonScan.scala:57)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.generatedConversionFunction(StreamTableSourceScan.scala:35)
at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:48)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.convertToInternalRow(StreamTableSourceScan.scala:35)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:107)

You can check the source code here:
 

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder