left join failing with FlinkLogicalJoinConverter NPE

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

left join failing with FlinkLogicalJoinConverter NPE

Karl Jin
Hello,

First time posting, so please let me know if the formatting isn't correct, etc.

I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated:

Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)]
at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
    ...
Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter
at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
... 11 more
Caused by: java.lang.NullPointerException
at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
Reply | Threaded
Open this post in threaded view
|

Re: left join failing with FlinkLogicalJoinConverter NPE

Xingcan Cui
Hi Karl,

It seems that some field types of your inputs were not properly extracted.
Could you share the result of `printSchema()` for your input tables?

Best,
Xingcan

> On Feb 25, 2019, at 4:35 PM, Karl Jin <[hidden email]> wrote:
>
> Hello,
>
> First time posting, so please let me know if the formatting isn't correct, etc.
>
> I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated:
>
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)]
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
> at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
> at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
> at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
> at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
> at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
> at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>     ...
> Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
> at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
> at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> ... 11 more
> Caused by: java.lang.NullPointerException
> at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
> at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
> at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
> at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
> at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
> at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
> at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
> at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
> at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
> at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
> at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
> at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
> at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
> at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
> at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
> at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
> at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
> at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)

Reply | Threaded
Open this post in threaded view
|

Re: left join failing with FlinkLogicalJoinConverter NPE

Karl Jin
Thanks for checking in quickly,

Below is what I got on printSchema on the two tables (left joining the second one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from the string field uc_update_ts

root
 |-- uc_pk: String
 |-- uc_update_ts: String
 |-- rowtime: TimeIndicatorTypeInfo(rowtime)
 |-- uc_version: String
 |-- uc_type: String
 |-- data_parsed: Map<String, String>

root
 |-- i_uc_pk: String
 |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
 |-- image_count: Long
 |-- i_data: Multiset<Map<String, String>>

On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <[hidden email]> wrote:
Hi Karl,

It seems that some field types of your inputs were not properly extracted.
Could you share the result of `printSchema()` for your input tables?

Best,
Xingcan

> On Feb 25, 2019, at 4:35 PM, Karl Jin <[hidden email]> wrote:
>
> Hello,
>
> First time posting, so please let me know if the formatting isn't correct, etc.
>
> I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated:
>
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)]
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>       at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>       at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>       at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>       at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>       at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>       at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>     ...
> Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>       at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>       at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>       ... 11 more
> Caused by: java.lang.NullPointerException
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>       at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>       at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>       at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>       at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
>       at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)

Reply | Threaded
Open this post in threaded view
|

Re: left join failing with FlinkLogicalJoinConverter NPE

Karl Jin
Do you think something funky might be happening with Map/Multiset types? If so how do I deal with it (I think I can verify by removing those columns and retry?)?

On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <[hidden email]> wrote:
Thanks for checking in quickly,

Below is what I got on printSchema on the two tables (left joining the second one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from the string field uc_update_ts

root
 |-- uc_pk: String
 |-- uc_update_ts: String
 |-- rowtime: TimeIndicatorTypeInfo(rowtime)
 |-- uc_version: String
 |-- uc_type: String
 |-- data_parsed: Map<String, String>

root
 |-- i_uc_pk: String
 |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
 |-- image_count: Long
 |-- i_data: Multiset<Map<String, String>>

On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <[hidden email]> wrote:
Hi Karl,

It seems that some field types of your inputs were not properly extracted.
Could you share the result of `printSchema()` for your input tables?

Best,
Xingcan

> On Feb 25, 2019, at 4:35 PM, Karl Jin <[hidden email]> wrote:
>
> Hello,
>
> First time posting, so please let me know if the formatting isn't correct, etc.
>
> I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated:
>
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)]
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>       at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>       at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>       at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>       at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>       at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>       at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>     ...
> Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>       at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>       at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>       ... 11 more
> Caused by: java.lang.NullPointerException
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>       at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>       at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>       at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>       at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
>       at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)

Reply | Threaded
Open this post in threaded view
|

Re: left join failing with FlinkLogicalJoinConverter NPE

Xingcan Cui
Yes. Please check that. If it's the nested type's problem, this might be a bug.

On Mon, Feb 25, 2019, 21:50 Karl Jin <[hidden email]> wrote:
Do you think something funky might be happening with Map/Multiset types? If so how do I deal with it (I think I can verify by removing those columns and retry?)?

On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <[hidden email]> wrote:
Thanks for checking in quickly,

Below is what I got on printSchema on the two tables (left joining the second one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from the string field uc_update_ts

root
 |-- uc_pk: String
 |-- uc_update_ts: String
 |-- rowtime: TimeIndicatorTypeInfo(rowtime)
 |-- uc_version: String
 |-- uc_type: String
 |-- data_parsed: Map<String, String>

root
 |-- i_uc_pk: String
 |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
 |-- image_count: Long
 |-- i_data: Multiset<Map<String, String>>

On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <[hidden email]> wrote:
Hi Karl,

It seems that some field types of your inputs were not properly extracted.
Could you share the result of `printSchema()` for your input tables?

Best,
Xingcan

> On Feb 25, 2019, at 4:35 PM, Karl Jin <[hidden email]> wrote:
>
> Hello,
>
> First time posting, so please let me know if the formatting isn't correct, etc.
>
> I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated:
>
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)]
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>       at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>       at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>       at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>       at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>       at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>       at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>     ...
> Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>       at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>       at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>       ... 11 more
> Caused by: java.lang.NullPointerException
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>       at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>       at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>       at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>       at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
>       at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)

Reply | Threaded
Open this post in threaded view
|

Re: left join failing with FlinkLogicalJoinConverter NPE

Karl Jin
I removed the multiset<map<string,string>> field and the join worked fine. The field was created from a Kafka source through a query that looks like "select collect(data) as i_data from ... group by pk"

Do you think this is a bug or is this something I can get around using some configuration?

On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <[hidden email]> wrote:
Yes. Please check that. If it's the nested type's problem, this might be a bug.

On Mon, Feb 25, 2019, 21:50 Karl Jin <[hidden email]> wrote:
Do you think something funky might be happening with Map/Multiset types? If so how do I deal with it (I think I can verify by removing those columns and retry?)?

On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <[hidden email]> wrote:
Thanks for checking in quickly,

Below is what I got on printSchema on the two tables (left joining the second one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from the string field uc_update_ts

root
 |-- uc_pk: String
 |-- uc_update_ts: String
 |-- rowtime: TimeIndicatorTypeInfo(rowtime)
 |-- uc_version: String
 |-- uc_type: String
 |-- data_parsed: Map<String, String>

root
 |-- i_uc_pk: String
 |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
 |-- image_count: Long
 |-- i_data: Multiset<Map<String, String>>

On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <[hidden email]> wrote:
Hi Karl,

It seems that some field types of your inputs were not properly extracted.
Could you share the result of `printSchema()` for your input tables?

Best,
Xingcan

> On Feb 25, 2019, at 4:35 PM, Karl Jin <[hidden email]> wrote:
>
> Hello,
>
> First time posting, so please let me know if the formatting isn't correct, etc.
>
> I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated:
>
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)]
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>       at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>       at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>       at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>       at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>       at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>       at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>     ...
> Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>       at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>       at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>       ... 11 more
> Caused by: java.lang.NullPointerException
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>       at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>       at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>       at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>       at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
>       at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)

Reply | Threaded
Open this post in threaded view
|

Re: left join failing with FlinkLogicalJoinConverter NPE

Xingcan Cui
Hi Karl,

I think this is a bug and created FLINK-11769 to track it.

Best,
Xingcan

On Feb 26, 2019, at 2:02 PM, Karl Jin <[hidden email]> wrote:

I removed the multiset<map<string,string>> field and the join worked fine. The field was created from a Kafka source through a query that looks like "select collect(data) as i_data from ... group by pk"

Do you think this is a bug or is this something I can get around using some configuration?

On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <[hidden email]> wrote:
Yes. Please check that. If it's the nested type's problem, this might be a bug.

On Mon, Feb 25, 2019, 21:50 Karl Jin <[hidden email]> wrote:
Do you think something funky might be happening with Map/Multiset types? If so how do I deal with it (I think I can verify by removing those columns and retry?)?

On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <[hidden email]> wrote:
Thanks for checking in quickly,

Below is what I got on printSchema on the two tables (left joining the second one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from the string field uc_update_ts

root
 |-- uc_pk: String
 |-- uc_update_ts: String
 |-- rowtime: TimeIndicatorTypeInfo(rowtime)
 |-- uc_version: String
 |-- uc_type: String
 |-- data_parsed: Map<String, String>

root
 |-- i_uc_pk: String
 |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
 |-- image_count: Long
 |-- i_data: Multiset<Map<String, String>>

On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <[hidden email]> wrote:
Hi Karl,

It seems that some field types of your inputs were not properly extracted.
Could you share the result of `printSchema()` for your input tables?

Best,
Xingcan

> On Feb 25, 2019, at 4:35 PM, Karl Jin <[hidden email]> wrote:
>
> Hello,
>
> First time posting, so please let me know if the formatting isn't correct, etc.
>
> I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated:
>
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)]
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>       at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>       at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>       at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>       at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>       at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>       at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>     ...
> Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>       at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>       at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>       ... 11 more
> Caused by: java.lang.NullPointerException
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>       at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>       at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>       at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>       at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
>       at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)


Reply | Threaded
Open this post in threaded view
|

Re: left join failing with FlinkLogicalJoinConverter NPE

Tony Wei
Hi,

Is there any update for this issue? I have had the same problem just like Karl's.
After I remove query like "select collect(data) ..." from one of the joined tables,
the sql can be executed correctly without throwing any NPE.

Best regards,
Tony Wei

Xingcan Cui <[hidden email]> 於 2019年2月27日 週三 下午12:53寫道:
Hi Karl,

I think this is a bug and created FLINK-11769 to track it.

Best,
Xingcan

On Feb 26, 2019, at 2:02 PM, Karl Jin <[hidden email]> wrote:

I removed the multiset<map<string,string>> field and the join worked fine. The field was created from a Kafka source through a query that looks like "select collect(data) as i_data from ... group by pk"

Do you think this is a bug or is this something I can get around using some configuration?

On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <[hidden email]> wrote:
Yes. Please check that. If it's the nested type's problem, this might be a bug.

On Mon, Feb 25, 2019, 21:50 Karl Jin <[hidden email]> wrote:
Do you think something funky might be happening with Map/Multiset types? If so how do I deal with it (I think I can verify by removing those columns and retry?)?

On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <[hidden email]> wrote:
Thanks for checking in quickly,

Below is what I got on printSchema on the two tables (left joining the second one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from the string field uc_update_ts

root
 |-- uc_pk: String
 |-- uc_update_ts: String
 |-- rowtime: TimeIndicatorTypeInfo(rowtime)
 |-- uc_version: String
 |-- uc_type: String
 |-- data_parsed: Map<String, String>

root
 |-- i_uc_pk: String
 |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
 |-- image_count: Long
 |-- i_data: Multiset<Map<String, String>>

On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <[hidden email]> wrote:
Hi Karl,

It seems that some field types of your inputs were not properly extracted.
Could you share the result of `printSchema()` for your input tables?

Best,
Xingcan

> On Feb 25, 2019, at 4:35 PM, Karl Jin <[hidden email]> wrote:
>
> Hello,
>
> First time posting, so please let me know if the formatting isn't correct, etc.
>
> I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated:
>
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)]
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>       at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>       at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>       at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>       at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>       at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>       at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>     ...
> Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>       at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>       at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>       ... 11 more
> Caused by: java.lang.NullPointerException
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>       at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>       at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>       at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>       at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
>       at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)


Reply | Threaded
Open this post in threaded view
|

Re: left join failing with FlinkLogicalJoinConverter NPE

Tony Wei
Hi,

I also found the similar issue here [1].

Best,
Tony Wei

Tony Wei <[hidden email]> 於 2019年7月19日 週五 下午5:38寫道:
Hi,

Is there any update for this issue? I have had the same problem just like Karl's.
After I remove query like "select collect(data) ..." from one of the joined tables,
the sql can be executed correctly without throwing any NPE.

Best regards,
Tony Wei

Xingcan Cui <[hidden email]> 於 2019年2月27日 週三 下午12:53寫道:
Hi Karl,

I think this is a bug and created FLINK-11769 to track it.

Best,
Xingcan

On Feb 26, 2019, at 2:02 PM, Karl Jin <[hidden email]> wrote:

I removed the multiset<map<string,string>> field and the join worked fine. The field was created from a Kafka source through a query that looks like "select collect(data) as i_data from ... group by pk"

Do you think this is a bug or is this something I can get around using some configuration?

On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <[hidden email]> wrote:
Yes. Please check that. If it's the nested type's problem, this might be a bug.

On Mon, Feb 25, 2019, 21:50 Karl Jin <[hidden email]> wrote:
Do you think something funky might be happening with Map/Multiset types? If so how do I deal with it (I think I can verify by removing those columns and retry?)?

On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <[hidden email]> wrote:
Thanks for checking in quickly,

Below is what I got on printSchema on the two tables (left joining the second one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from the string field uc_update_ts

root
 |-- uc_pk: String
 |-- uc_update_ts: String
 |-- rowtime: TimeIndicatorTypeInfo(rowtime)
 |-- uc_version: String
 |-- uc_type: String
 |-- data_parsed: Map<String, String>

root
 |-- i_uc_pk: String
 |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
 |-- image_count: Long
 |-- i_data: Multiset<Map<String, String>>

On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <[hidden email]> wrote:
Hi Karl,

It seems that some field types of your inputs were not properly extracted.
Could you share the result of `printSchema()` for your input tables?

Best,
Xingcan

> On Feb 25, 2019, at 4:35 PM, Karl Jin <[hidden email]> wrote:
>
> Hello,
>
> First time posting, so please let me know if the formatting isn't correct, etc.
>
> I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated:
>
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)]
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>       at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>       at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>       at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>       at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>       at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>       at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>     ...
> Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>       at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>       at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>       ... 11 more
> Caused by: java.lang.NullPointerException
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>       at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>       at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>       at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>       at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
>       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
>       at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)