Good evening,
I started working with the beta Flink SQL in BatchTableEnvironment and I am interested to convert the resulted Table object into a DataSet<Tuple2<String,Integer>>. I give some lines of code as example:
DataSet<Tuple4<String,String,Double,Double>> ds1 = ... tableEnv.registerDataSet("EventsTable", ds1, "event_id,region,latitude,longitude");
Table result = tableEnv.sql("SELECT region, COUNT(event_id) AS counter FROM EventsTable GROUP BY region");
TypeInformation<Tuple2<String, Integer>> typeInformation = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { });
DataSet<Tuple2<String, Integer>> resDS = tableEnv.toDataSet(result, typeInformation);
Caused by: org.apache.flink.api.table.codegen.CodeGenException: Incompatible types of expression and result type.
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:327)
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:325)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:325)
at org.apache.flink.api.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.getConversionMapper(DataSetRel.scala:89)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.getConversionMapper(DataSetAggregate.scala:38)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.translateToPlan(DataSetAggregate.scala:142)
at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
at org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:163)
What is the correct way to achieve this ? According to solved JIRA issue FLINK-1991 (https://issues.apache.org/jira/browse/FLINK-1991) , this is now possible (in Flink 1.1.0).
As a side note, it works transforming to DataSet<Row>, but when trying to convert to a POJO dataset (again with fields String region and Integer counter) it gives the same error as above.
Thanks in advance, Best regards, Camelia
|
I can confirm that the code u have works in Flink 1.1.0 On Sat, Aug 20, 2016 at 3:37 PM, Camelia Elena Ciolac <[hidden email]> wrote:
|
In reply to this post by Camelia Elena Ciolac
So the working code is :
TypeInformation<Tuple2<String, Long>> typeInformation = TypeInformation.of(new TypeHint<Tuple2<String, Long>>() { });
DataSet<Tuple2<String, Long>> resDS = tableEnv.toDataSet(result, typeInformation);
Regards, Camelia
From: Camelia Elena Ciolac
Sent: Saturday, August 20, 2016 9:37 PM To: [hidden email] Subject: Question tableEnv.toDataSet : Table to DataSet<Tuple2<String,Integer>>
Good evening,
I started working with the beta Flink SQL in BatchTableEnvironment and I am interested to convert the resulted Table object into a DataSet<Tuple2<String,Integer>>. I give some lines of code as example:
DataSet<Tuple4<String,String,Double,Double>> ds1 = ... tableEnv.registerDataSet("EventsTable", ds1, "event_id,region,latitude,longitude");
Table result = tableEnv.sql("SELECT region, COUNT(event_id) AS counter FROM EventsTable GROUP BY region");
TypeInformation<Tuple2<String, Integer>> typeInformation = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { });
DataSet<Tuple2<String, Integer>> resDS = tableEnv.toDataSet(result, typeInformation);
Caused by: org.apache.flink.api.table.codegen.CodeGenException: Incompatible types of expression and result type.
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:327)
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:325)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:325)
at org.apache.flink.api.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.getConversionMapper(DataSetRel.scala:89)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.getConversionMapper(DataSetAggregate.scala:38)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.translateToPlan(DataSetAggregate.scala:142)
at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
at org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:163)
What is the correct way to achieve this ? According to solved JIRA issue FLINK-1991 (https://issues.apache.org/jira/browse/FLINK-1991) , this is now possible (in Flink 1.1.0).
As a side note, it works transforming to DataSet<Row>, but when trying to convert to a POJO dataset (again with fields String region and Integer counter) it gives the same error as above.
Thanks in advance, Best regards, Camelia
|
Free forum by Nabble | Edit this page |