Hello, First time posting, so please let me know if the formatting isn't correct, etc. I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated: Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373) at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205) at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185) at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143) ... Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) ... 11 more Caused by: java.lang.NullPointerException at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84) at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29) at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104) at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29) at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80) at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48) at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79) at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29) at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51) at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162) at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301) at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953) at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339) at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322) at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135) |
Hi Karl,
It seems that some field types of your inputs were not properly extracted. Could you share the result of `printSchema()` for your input tables? Best, Xingcan > On Feb 25, 2019, at 4:35 PM, Karl Jin <[hidden email]> wrote: > > Hello, > > First time posting, so please let me know if the formatting isn't correct, etc. > > I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated: > > Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)] > at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236) > at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) > at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373) > at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292) > at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812) > at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) > at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205) > at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185) > at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143) > ... > Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter > at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149) > at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234) > at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) > ... 11 more > Caused by: java.lang.NullPointerException > at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84) > at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29) > at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104) > at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29) > at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80) > at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79) > at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) > at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) > at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48) > at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79) > at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29) > at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51) > at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162) > at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source) > at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source) > at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301) > at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953) > at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339) > at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322) > at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643) > at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579) > at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879) > at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755) > at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135) |
Thanks for checking in quickly, Below is what I got on printSchema on the two tables (left joining the second one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from the string field uc_update_ts root |-- uc_pk: String |-- uc_update_ts: String |-- rowtime: TimeIndicatorTypeInfo(rowtime) |-- uc_version: String |-- uc_type: String |-- data_parsed: Map<String, String> root |-- i_uc_pk: String |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime) |-- image_count: Long |-- i_data: Multiset<Map<String, String>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <[hidden email]> wrote: Hi Karl, |
Do you think something funky might be happening with Map/Multiset types? If so how do I deal with it (I think I can verify by removing those columns and retry?)? On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <[hidden email]> wrote:
|
Yes. Please check that. If it's the nested type's problem, this might be a bug. On Mon, Feb 25, 2019, 21:50 Karl Jin <[hidden email]> wrote:
|
I removed the multiset<map<string,string>> field and the join worked fine. The field was created from a Kafka source through a query that looks like "select collect(data) as i_data from ... group by pk" Do you think this is a bug or is this something I can get around using some configuration? On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <[hidden email]> wrote:
|
Hi Karl,
I think this is a bug and created FLINK-11769 to track it. Best, Xingcan
|
Hi, Is there any update for this issue? I have had the same problem just like Karl's. After I remove query like "select collect(data) ..." from one of the joined tables, the sql can be executed correctly without throwing any NPE. Best regards, Tony Wei Xingcan Cui <[hidden email]> 於 2019年2月27日 週三 下午12:53寫道:
|
Hi, I also found the similar issue here [1]. Best, Tony Wei Tony Wei <[hidden email]> 於 2019年7月19日 週五 下午5:38寫道:
|
Free forum by Nabble | Edit this page |