Question tableEnv.toDataSet : Table to DataSet<Tuple2<String,Integer>>

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Question tableEnv.toDataSet : Table to DataSet<Tuple2<String,Integer>>

Camelia Elena Ciolac


Good evening,


I started working with the beta Flink SQL in BatchTableEnvironment and I am interested to convert the resulted Table object into a DataSet<Tuple2<String,Integer>>.

I give some lines of code as example:


DataSet<Tuple4<String,String,Double,Double>> ds1 = ...

tableEnv.registerDataSet("EventsTable", ds1, "event_id,region,latitude,longitude");            

Table result = tableEnv.sql("SELECT region, COUNT(event_id) AS counter FROM EventsTable GROUP BY region");

TypeInformation<Tuple2<String, Integer>> typeInformation = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { });
DataSet<Tuple2<String, Integer>> resDS = tableEnv.toDataSet(result, typeInformation);
            
At runtime, I get error:


Caused by: org.apache.flink.api.table.codegen.CodeGenException: Incompatible types of expression and result type.
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:327)
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:325)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:325)
at org.apache.flink.api.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.getConversionMapper(DataSetRel.scala:89)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.getConversionMapper(DataSetAggregate.scala:38)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.translateToPlan(DataSetAggregate.scala:142)
at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
at org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:163)


What is the correct way to achieve this ?

According to solved JIRA issue FLINK-1991 (https://issues.apache.org/jira/browse/FLINK-1991) , this is now possible (in Flink 1.1.0). 


As a side note, it works transforming to DataSet<Row>, but when trying to convert to a POJO dataset (again with fields String region and Integer counter) it gives the same error as above.


Thanks in advance,

Best regards,

Camelia



Reply | Threaded
Open this post in threaded view
|

Re: Question tableEnv.toDataSet : Table to DataSet<Tuple2<String,Integer>>

Suneel Marthi
I can confirm that the code u have works in Flink 1.1.0

On Sat, Aug 20, 2016 at 3:37 PM, Camelia Elena Ciolac <[hidden email]> wrote:


Good evening,


I started working with the beta Flink SQL in BatchTableEnvironment and I am interested to convert the resulted Table object into a DataSet<Tuple2<String,Integer>>.

I give some lines of code as example:


DataSet<Tuple4<String,String,Double,Double>> ds1 = ...

tableEnv.registerDataSet("EventsTable", ds1, "event_id,region,latitude,longitude");            

Table result = tableEnv.sql("SELECT region, COUNT(event_id) AS counter FROM EventsTable GROUP BY region");

TypeInformation<Tuple2<String, Integer>> typeInformation = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { });
DataSet<Tuple2<String, Integer>> resDS = tableEnv.toDataSet(result, typeInformation);
            
At runtime, I get error:


Caused by: org.apache.flink.api.table.codegen.CodeGenException: Incompatible types of expression and result type.
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:327)
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:325)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:325)
at org.apache.flink.api.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.getConversionMapper(DataSetRel.scala:89)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.getConversionMapper(DataSetAggregate.scala:38)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.translateToPlan(DataSetAggregate.scala:142)
at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
at org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:163)


What is the correct way to achieve this ?

According to solved JIRA issue FLINK-1991 (https://issues.apache.org/jira/browse/FLINK-1991) , this is now possible (in Flink 1.1.0). 


As a side note, it works transforming to DataSet<Row>, but when trying to convert to a POJO dataset (again with fields String region and Integer counter) it gives the same error as above.


Thanks in advance,

Best regards,

Camelia




Reply | Threaded
Open this post in threaded view
|

Re: Question tableEnv.toDataSet : Table to DataSet<Tuple2<String,Integer>>

Camelia Elena Ciolac
In reply to this post by Camelia Elena Ciolac


Indeed, my source code was overall correct (thank you  Suneel Marthi  for confirming), but the COUNT function in Table SQL returns Long, not Integer.
So the working code is :

TypeInformation<Tuple2<String, Long>> typeInformation = TypeInformation.of(new TypeHint<Tuple2<String, Long>>() { });
DataSet<Tuple2<String, Long>> resDS = tableEnv.toDataSet(result, typeInformation);

Regards,
Camelia



From: Camelia Elena Ciolac
Sent: Saturday, August 20, 2016 9:37 PM
To: [hidden email]
Subject: Question tableEnv.toDataSet : Table to DataSet<Tuple2<String,Integer>>
 


Good evening,


I started working with the beta Flink SQL in BatchTableEnvironment and I am interested to convert the resulted Table object into a DataSet<Tuple2<String,Integer>>.

I give some lines of code as example:


DataSet<Tuple4<String,String,Double,Double>> ds1 = ...

tableEnv.registerDataSet("EventsTable", ds1, "event_id,region,latitude,longitude");            

Table result = tableEnv.sql("SELECT region, COUNT(event_id) AS counter FROM EventsTable GROUP BY region");

TypeInformation<Tuple2<String, Integer>> typeInformation = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { });
DataSet<Tuple2<String, Integer>> resDS = tableEnv.toDataSet(result, typeInformation);
            
At runtime, I get error:


Caused by: org.apache.flink.api.table.codegen.CodeGenException: Incompatible types of expression and result type.
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:327)
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:325)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:325)
at org.apache.flink.api.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.getConversionMapper(DataSetRel.scala:89)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.getConversionMapper(DataSetAggregate.scala:38)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.translateToPlan(DataSetAggregate.scala:142)
at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
at org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:163)


What is the correct way to achieve this ?

According to solved JIRA issue FLINK-1991 (https://issues.apache.org/jira/browse/FLINK-1991) , this is now possible (in Flink 1.1.0). 


As a side note, it works transforming to DataSet<Row>, but when trying to convert to a POJO dataset (again with fields String region and Integer counter) it gives the same error as above.


Thanks in advance,

Best regards,

Camelia