Hi I have encountered a problem with Flink SQL. My code: DataSet<MarketDataInfo> dataSet0 = env.fromCollection( infos0 ); tableEnv.registerDataSet( "table0", dataSet0 ); String sql = "select closePrice from table0" Table table = tableEnv.sql( sql ); tableEnv.registerTable( tableName, table ); DataSet<Row> redyData = tableEnv.toDataSet( table, Row.class ); This works fine. But when I change SQL to "select distinct closePrice from table0" “tableEnv.toDataSet” throws exception: java.lang.AssertionError: Internal error: Error occurred while applying rule DataSetAggregateRule at org.apache.calcite.util.Util.newInternal(Util.java:792) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:118) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:214) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:825) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334) at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:253) at org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146) at com.streamingedge.marketreport.analytics.flink.FlinkDataSetAnalytics.analize(FlinkDataSetAnalytics.java:96) at com.streamingedge.marketreport.webserver.AnalyticsServlet.processRequest(AnalyticsServlet.java:117) at com.streamingedge.marketreport.webserver.AnalyticsServlet.doPost(AnalyticsServlet.java:40) at com.streamingedge.marketreport.webserver.AnalyticsServlet.doGet(AnalyticsServlet.java:35) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:229) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:427) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:193) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116) at org.eclipse.jetty.server.Server.handle(Server.java:366) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:973) at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1035) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:641) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:231) at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:696) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:53) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.api.table.TableException: Unsupported data type encountered at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRowSize$2.apply(DataSetRel.scala:65) at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRowSize$2.apply(DataSetRel.scala:53) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47) at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.estimateRowSize(DataSetRel.scala:53) at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.estimateRowSize(DataSetAggregate.scala:38) at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.computeSelfCost(DataSetAggregate.scala:80) at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162) at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:258) at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1134) at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:336) at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:319) at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1838) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1774) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1950) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:137) ... 35 more |
Free forum by Nabble | Edit this page |