Is java.sql.Timestamp fully suported in Flink SQL?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Is java.sql.Timestamp fully suported in Flink SQL?

Davran Muzafarov

I have two tables created from data sets:

 

List<MarketDataInfo> infos0 = .....

 

List<MarketDataInfo> infos1 = .....

 

 

DataSet<MarketDataInfo> dataSet0 = env.fromCollection( infos0 );

 

DataSet<MarketDataInfo> dataSet1 = env.fromCollection( infos1 );

 

 

tableEnv.registerDataSet( "table0", dataSet0 );

tableEnv.registerDataSet( "table1", dataSet1 );

 

 

Table table = tableEnv.sql( "select * from table0 union select * from table1" );

 

 

DataSet<Row> redyData = tableEnv.toDataSet( table, Row.class );

 

 

If “MarketDataInfo” have only String, Floats or Integers fields "toDataSet" works.

If MarketDataInfo has Timestamp, I am getting:

 

 

 

Internal error: Error occurred while applying rule DataSetAggregateRule

                at org.apache.calcite.util.Util.newInternal(Util.java:792)

                at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)

                at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)

                at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:118)

                at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:214)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:825)

                at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)

                at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:253)

                at org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146)

                ...

Caused by: org.apache.flink.api.table.TableException: Unsupported data type encountered

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRowSize$2.apply(DataSetRel.scala:65)

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRowSize$2.apply(DataSetRel.scala:53)

                at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)

                at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)

                at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47)

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.estimateRowSize(DataSetRel.scala:53)

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.estimateRowSize(DataSetAggregate.scala:38)

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.computeSelfCost(DataSetAggregate.scala:80)

                at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)

                at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)

                at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)

                at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:258)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1134)

                at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:336)

                at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:319)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1838)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1774)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1950)

                at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:137)

                ... 35 more

 

 

Am I missing something?

 

Thank you,

Davran.

 

Reply | Threaded
Open this post in threaded view
|

Re: Is java.sql.Timestamp fully suported in Flink SQL?

Timo Walther
Hi Davran,

unfortunately, you found a bug. I created an issue for it ( https://issues.apache.org/jira/browse/FLINK-4385). You could convert the timestamp to a long value as a workaround.

Table table1 = tableEnv.fromDataSet(dataSet1);
Table table2 = tableEnv.fromDataSet(dataSet2);
Table table = table1.select("t.cast(LONG)").union(table2.select("t.cast(LONG)"));

I hope that helps. Sorry, for the inconvenience.

Timo


Am 11/08/16 um 18:28 schrieb Davran Muzafarov:

I have two tables created from data sets:

 

List<MarketDataInfo> infos0 = .....

 

List<MarketDataInfo> infos1 = .....

 

 

DataSet<MarketDataInfo> dataSet0 = env.fromCollection( infos0 );

 

DataSet<MarketDataInfo> dataSet1 = env.fromCollection( infos1 );

 

 

tableEnv.registerDataSet( "table0", dataSet0 );

tableEnv.registerDataSet( "table1", dataSet1 );

 

 

Table table = tableEnv.sql( "select * from table0 union select * from table1" );

 

 

DataSet<Row> redyData = tableEnv.toDataSet( table, Row.class );

 

 

If “MarketDataInfo” have only String, Floats or Integers fields "toDataSet" works.

If MarketDataInfo has Timestamp, I am getting:

 

 

 

Internal error: Error occurred while applying rule DataSetAggregateRule

                at org.apache.calcite.util.Util.newInternal(Util.java:792)

                at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)

                at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)

                at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:118)

                at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:214)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:825)

                at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)

                at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:253)

                at org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146)

                ...

Caused by: org.apache.flink.api.table.TableException: Unsupported data type encountered

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRowSize$2.apply(DataSetRel.scala:65)

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRowSize$2.apply(DataSetRel.scala:53)

                at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)

                at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)

                at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47)

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.estimateRowSize(DataSetRel.scala:53)

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.estimateRowSize(DataSetAggregate.scala:38)

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.computeSelfCost(DataSetAggregate.scala:80)

                at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)

                at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)

                at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)

                at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:258)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1134)

                at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:336)

                at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:319)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1838)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1774)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1950)

                at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:137)

                ... 35 more

 

 

Am I missing something?

 

Thank you,

Davran.

 



-- 
Freundliche Grüße / Kind Regards

Timo Walther 

Follow me: @twalthr
https://www.linkedin.com/in/twalthr