Flink SQL problem

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL problem

Davran Muzafarov

Hi I have encountered a problem with Flink SQL.

 

My code:

 

DataSet<MarketDataInfo> dataSet0 = env.fromCollection( infos0 );

tableEnv.registerDataSet( "table0", dataSet0 );

 

 

String sql = "select closePrice from table0"

 

Table table = tableEnv.sql( sql );

tableEnv.registerTable( tableName, table );

 

 

DataSet<Row> redyData = tableEnv.toDataSet( table, Row.class );

 

This works fine.

 

But when I change SQL to "select distinct closePrice from table0" “tableEnv.toDataSet” throws exception:

 

java.lang.AssertionError: Internal error: Error occurred while applying rule DataSetAggregateRule

                at org.apache.calcite.util.Util.newInternal(Util.java:792)

                at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)

                at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)

                at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:118)

                at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:214)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:825)

                at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)

                at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:253)

                at org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146)

                at com.streamingedge.marketreport.analytics.flink.FlinkDataSetAnalytics.analize(FlinkDataSetAnalytics.java:96)

                at com.streamingedge.marketreport.webserver.AnalyticsServlet.processRequest(AnalyticsServlet.java:117)

                at com.streamingedge.marketreport.webserver.AnalyticsServlet.doPost(AnalyticsServlet.java:40)

                at com.streamingedge.marketreport.webserver.AnalyticsServlet.doGet(AnalyticsServlet.java:35)

                at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)

                at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)

                at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)

                at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)

                at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:229)

                at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)

                at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:427)

                at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:193)

                at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)

                at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)

                at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)

                at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)

                at org.eclipse.jetty.server.Server.handle(Server.java:366)

                at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)

                at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:973)

                at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1035)

                at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:641)

                at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:231)

                at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)

                at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:696)

                at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:53)

                at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)

                at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)

                at java.lang.Thread.run(Unknown Source)

Caused by: org.apache.flink.api.table.TableException: Unsupported data type encountered

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRowSize$2.apply(DataSetRel.scala:65)

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRowSize$2.apply(DataSetRel.scala:53)

                at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)

                at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)

                at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47)

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.estimateRowSize(DataSetRel.scala:53)

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.estimateRowSize(DataSetAggregate.scala:38)

                at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.computeSelfCost(DataSetAggregate.scala:80)

                at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)

                at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)

                at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)

                at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:258)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1134)

                at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:336)

                at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:319)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1838)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1774)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058)

                at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1950)

                at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:137)

                ... 35 more