When I try to refactor my joins into a temporary view to share joins and state, I get the following error. I tried a few variations of the code snippets below (adding TIMESTAMP casts based on Google searches). I removed a bunch of fields to simplify this example.
Is this a known issue? Do I have a simple coding bug? CREATE TEMPORARY VIEW `flat_impression_view` AS SELECT DATE_FORMAT(input_impression.ts, 'yyyy-MM-dd') AS dt, input_insertion.log_user_id AS insertion_log_user_id, COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS insertion_ts, input_insertion.insertion_id AS insertion_insertion_id, COALESCE(CAST(input_impression.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS impression_ts, input_impression.impression_id AS impression_impression_id, input_impression.insertion_id AS impression_insertion_id, FROM input_insertion JOIN input_impression ON input_insertion.insertion_id = input_impression.insertion_id AND CAST(input_insertion.ts AS TIMESTAMP) BETWEEN CAST(input_impression.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND CAST(input_impression.ts AS TIMESTAMP) + INTERVAL '1' HOUR INSERT INTO `flat_impression_w_click` SELECT dt, insertion_log_user_id, CAST(insertion_ts AS TIMESTAMP(3)) AS insertion_ts, insertion_insertion_id, CAST(impression_ts AS TIMESTAMP(3)) AS mpression_ts, impression_impression_id, impression_insertion_id, COALESCE(CAST(input_click.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS click_ts, COALESCE(input_click.click_id, EmptyByteArray()) AS click_click_id, COALESCE(input_click.impression_id, EmptyByteArray()) AS click_impression_id, FROM flat_impression_view LEFT JOIN input_click ON flat_impression_view.impression_impression_id = input_click.impression_id AND CAST(flat_impression_view.impression_ts AS TIMESTAMP) BETWEEN CAST(input_click.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND CAST(input_click.ts AS TIMESTAMP) + INTERVAL '12' HOUR
java.lang.RuntimeException: Failed to executeSql=... ... Caused by: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalLegacySink(name=[...]) +- FlinkLogicalCalc(select=[...]) +- FlinkLogicalJoin(condition=[AND(=($36, $45), >=(CAST($35):TIMESTAMP(6) NOT NULL, -(CAST($43):TIMESTAMP(6), 43200000:INTERVAL HOUR)), <=(CAST($35):TIMESTAMP(6) NOT NULL, +(CAST($43):TIMESTAMP(6), 43200000:INTERVAL HOUR)))], joinType=[left]) :- FlinkLogicalCalc(select=[...]) : +- FlinkLogicalJoin(condition=[AND(=($5, $35), >=(CAST($4):TIMESTAMP(6), -(CAST($33):TIMESTAMP(6), 43200000:INTERVAL HOUR)), <=(CAST($4):TIMESTAMP(6), +(CAST($33):TIMESTAMP(6), 3600000:INTERVAL HOUR)))], joinType=[inner]) : :- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_insertion]]) : +- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_impression]]) +- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_click]]) Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) at com.example.logprocessor.common.flink.TableEnvUtils.executeSql(TableEnvUtils.java:14) ... 24 more Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. at org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecJoinRule.matches(StreamExecJoinRule.scala:88) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:272) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:256) at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1476) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1742) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:534) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) ... 45 more |
Hi Dan,
are you intending to use interval joins, regular joins, or a mixture of both? For regular joins you must ensure to cast a rowtime attribute to timestamp as early as possible. For interval joins, you need to make sure that the rowtime attribute is unmodified. Currently, I see COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS insertion_ts or CAST(flat_impression_view.impression_ts AS TIMESTAMP) which disables interval joins implicitly. If you would like to keep the interval join properties, you need to do the casting in a computed column during a CREATE TABLE statement. Before declaring a watermark for it. Regards, Timo On 15.12.20 18:47, Dan Hill wrote: > When I try to refactor my joins into a temporary view to share joins and > state, I get the following error. I tried a few variations of the code > snippets below (adding TIMESTAMP casts based on Google searches). I > removed a bunch of fields to simplify this example. > > Is this a known issue? Do I have a simple coding bug? > > CREATE TEMPORARY VIEW `flat_impression_view` AS > > SELECT > > DATE_FORMAT(input_impression.ts, 'yyyy-MM-dd') AS dt, > > input_insertion.log_user_id AS insertion_log_user_id, > > COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS > TIMESTAMP(3))) AS insertion_ts, > > input_insertion.insertion_id AS insertion_insertion_id, > > COALESCE(CAST(input_impression.ts AS TIMESTAMP(3)), CAST(0 AS > TIMESTAMP(3))) AS impression_ts, > > input_impression.impression_id AS impression_impression_id, > > input_impression.insertion_id AS impression_insertion_id, > > FROM input_insertion > > JOIN input_impression > > ON input_insertion.insertion_id = input_impression.insertion_id > > AND CAST(input_insertion.ts AS TIMESTAMP) BETWEEN > CAST(input_impression.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND > CAST(input_impression.ts AS TIMESTAMP) + INTERVAL '1' HOUR > > > INSERT INTO `flat_impression_w_click` > > SELECT > > dt, > > insertion_log_user_id, > > CAST(insertion_ts AS TIMESTAMP(3)) AS insertion_ts, > > insertion_insertion_id, > > CAST(impression_ts AS TIMESTAMP(3)) AS mpression_ts, > > impression_impression_id, > > impression_insertion_id, > > COALESCE(CAST(input_click.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) > AS click_ts, > > COALESCE(input_click.click_id, EmptyByteArray()) AS click_click_id, > > COALESCE(input_click.impression_id, EmptyByteArray()) AS > click_impression_id, > > FROM flat_impression_view > > LEFT JOIN input_click > > ON flat_impression_view.impression_impression_id = input_click.impression_id > > AND CAST(flat_impression_view.impression_ts AS TIMESTAMP) BETWEEN > CAST(input_click.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND > CAST(input_click.ts AS TIMESTAMP) + INTERVAL '12' HOUR > > > java.lang.RuntimeException: Failed to executeSql=... > > ... > > Caused by: org.apache.flink.table.api.TableException: Cannot generate a > valid execution plan for the given query: > > FlinkLogicalLegacySink(name=[...]) > > +- FlinkLogicalCalc(select=[...]) > > +- FlinkLogicalJoin(condition=[AND(=($36, $45), > >=(CAST($35):TIMESTAMP(6) NOT NULL, -(CAST($43):TIMESTAMP(6), > 43200000:INTERVAL HOUR)), <=(CAST($35):TIMESTAMP(6) NOT NULL, > +(CAST($43):TIMESTAMP(6), 43200000:INTERVAL HOUR)))], joinType=[left]) > > :- FlinkLogicalCalc(select=[...]) > > :+- FlinkLogicalJoin(condition=[AND(=($5, $35), > >=(CAST($4):TIMESTAMP(6), -(CAST($33):TIMESTAMP(6), 43200000:INTERVAL > HOUR)), <=(CAST($4):TIMESTAMP(6), +(CAST($33):TIMESTAMP(6), > 3600000:INTERVAL HOUR)))], joinType=[inner]) > > : :- FlinkLogicalDataStreamTableScan(table=[[default, mydb, > input_insertion]]) > > : +- FlinkLogicalDataStreamTableScan(table=[[default, mydb, > input_impression]]) > > +- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_click]]) > > > Rowtime attributes must not be in the input rows of a regular join. As a > workaround you can cast the time attributes of input tables to TIMESTAMP > before. > > Please check the documentation for the set of currently supported SQL > features. > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) > > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80) > > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) > > at > com.example.logprocessor.common.flink.TableEnvUtils.executeSql(TableEnvUtils.java:14) > > ... 24 more > > Caused by: org.apache.flink.table.api.TableException: Rowtime attributes > must not be in the input rows of a regular join. As a workaround you can > cast the time attributes of input tables to TIMESTAMP before. > > at > org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecJoinRule.matches(StreamExecJoinRule.scala:88) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:272) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:256) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1476) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1742) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) > > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91) > > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:534) > > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > > ... 45 more > > |
Free forum by Nabble | Edit this page |