Good evening,
I started working with the beta Flink SQL in BatchTableEnvironment and I am interested to convert the resulted Table object into a DataSet<Tuple2<String,Integer>
>. I give some lines of code as example:
DataSet<Tuple4<String,String,
Double,Double>> ds1 = ...
tableEnv.registerDataSet("EventsTable", ds1, "event_id,region,latitude, longitude");
Table result = tableEnv.sql("SELECT region, COUNT(event_id) AS counter FROM EventsTable GROUP BY region");
TypeInformation<Tuple2<String, Integer>> typeInformation = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { });DataSet<Tuple2<String, Integer>> resDS = tableEnv.toDataSet(result, typeInformation);At runtime, I get error:
Caused by: org.apache.flink.api.table.codegen.CodeGenException: Incompatible types of expression and result type. at org.apache.flink.api.table.codegen.CodeGenerator$$ anonfun$ generateResultExpression$2. apply(CodeGenerator.scala:327) at org.apache.flink.api.table.codegen.CodeGenerator$$ anonfun$ generateResultExpression$2. apply(CodeGenerator.scala:325) at scala.collection.Iterator$class.foreach(Iterator.scala: 727) at scala.collection.AbstractIterator.foreach( Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike. scala:72) at scala.collection.AbstractIterable.foreach( Iterable.scala:54) at org.apache.flink.api.table.codegen.CodeGenerator. generateResultExpression( CodeGenerator.scala:325) at org.apache.flink.api.table.codegen.CodeGenerator. generateConverterResultExpress ion(CodeGenerator.scala:269) at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$ class.getConversionMapper( DataSetRel.scala:89) at org.apache.flink.api.table.plan.nodes.dataset. DataSetAggregate. getConversionMapper( DataSetAggregate.scala:38) at org.apache.flink.api.table.plan.nodes.dataset. DataSetAggregate. translateToPlan( DataSetAggregate.scala:142) at org.apache.flink.api.table.BatchTableEnvironment. translate( BatchTableEnvironment.scala: 274) at org.apache.flink.api.java.table.BatchTableEnvironment. toDataSet( BatchTableEnvironment.scala: 163)
What is the correct way to achieve this ?
According to solved JIRA issue FLINK-1991 (https://issues.
apache.org/jira/browse/FLINK- ) , this is now possible (in Flink 1.1.0).1991
As a side note, it works transforming to DataSet<Row>, but when trying to convert to a POJO dataset (again with fields String region and Integer counter) it gives the same error as above.
Thanks in advance,
Best regards,
Camelia
Free forum by Nabble | Edit this page |