[Problem] Unable to do join on TumblingEventTimeWindows using SQL

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[Problem] Unable to do join on TumblingEventTimeWindows using SQL

Manoj Kumar

Hi All,

[Explanation]


Two tables say lineitem and orders:

Table orderstbl=bsTableEnv.fromDataStream(orders,"a,b,c,d,e,f,g,h,i,orders.rowtime");
Table lineitemtbl=bsTableEnv.fromDataStream(lineitem,"a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,lineitem.rowtime");

bsTableEnv.registerTable("Orders",orderstbl);
bsTableEnv.registerTable("Lineitem",lineitemtbl)

#Rgular tumble window works

 Table sqlResult = bsTableEnv.sqlQuery("Select count(Orders.a) FROM Orders GROUP BY TUMBLE(orders, INTERVAL '5' SECOND)");
 Table sqlResult = bsTableEnv.sqlQuery("Select count(Lineitem.a) FROM Lineitem GROUP BY TUMBLE(lineitem, INTERVAL '5' SECOND)");

#Datastream TumblingEventTimeWindows joins also works fine
lineitem.join(orders).where(...).equalTo(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(...)

But when I try to join them over same window it gives me error, it might possible I am writing wrong SQL :(

Table sqlResult  = bsTableEnv.sqlQuery("SELECT    count(Lineitem.a) FROM "
        + "Orders,Lineitem where Lineitem.a=Orders.a "
        + "GROUP BY TUMBLE(orders, INTERVAL '5' SECOND)");

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:

FlinkLogicalSink(name=[sink], fields=[b])
+- FlinkLogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
   +- FlinkLogicalCalc(select=[orders, a0])
      +- FlinkLogicalJoin(condition=[=($2, $0)], joinType=[inner])
         :- FlinkLogicalCalc(select=[a, orders])
         :  +- FlinkLogicalDataStreamTableScan(table=[[Unregistered_DataStream_3]])
         +- FlinkLogicalCalc(select=[a])
            +- FlinkLogicalDataStreamTableScan(table=[[Unregistered_DataStream_6]])

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:147)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327)
at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
at bzflink.StreamingTable.main(StreamingTable.java:65)
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
at org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecJoinRule.matches(StreamExecJoinRule.scala:88)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:367)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:367)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1522)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1795)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1656)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1656)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1656)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:325)
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
... 20 more

Process finished with exit code 1

--
Regards,
Manoj  Kumar
Reply | Threaded
Open this post in threaded view
|

Re: [Problem] Unable to do join on TumblingEventTimeWindows using SQL

Fabian Hueske-2
Hi,

the exception says: "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.".

The problem is that your query first joins the two tables without a temporal condition and then wants to do a windowed grouping.
Joins without temporal condition are not able to preserve the rowtime attribute.
You should try to change the join into a time-windowed join [1] [2] by adding a BETWEEN predicate on the rowtime attributes of both tables.

Best, Fabian


Am Mi., 23. Okt. 2019 um 09:18 Uhr schrieb Manoj Kumar <[hidden email]>:

Hi All,

[Explanation]


Two tables say lineitem and orders:

Table orderstbl=bsTableEnv.fromDataStream(orders,"a,b,c,d,e,f,g,h,i,orders.rowtime");
Table lineitemtbl=bsTableEnv.fromDataStream(lineitem,"a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,lineitem.rowtime");

bsTableEnv.registerTable("Orders",orderstbl);
bsTableEnv.registerTable("Lineitem",lineitemtbl)

#Rgular tumble window works

 Table sqlResult = bsTableEnv.sqlQuery("Select count(Orders.a) FROM Orders GROUP BY TUMBLE(orders, INTERVAL '5' SECOND)");
 Table sqlResult = bsTableEnv.sqlQuery("Select count(Lineitem.a) FROM Lineitem GROUP BY TUMBLE(lineitem, INTERVAL '5' SECOND)");

#Datastream TumblingEventTimeWindows joins also works fine
lineitem.join(orders).where(...).equalTo(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(...)

But when I try to join them over same window it gives me error, it might possible I am writing wrong SQL :(

Table sqlResult  = bsTableEnv.sqlQuery("SELECT    count(Lineitem.a) FROM "
        + "Orders,Lineitem where Lineitem.a=Orders.a "
        + "GROUP BY TUMBLE(orders, INTERVAL '5' SECOND)");

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:

FlinkLogicalSink(name=[sink], fields=[b])
+- FlinkLogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
   +- FlinkLogicalCalc(select=[orders, a0])
      +- FlinkLogicalJoin(condition=[=($2, $0)], joinType=[inner])
         :- FlinkLogicalCalc(select=[a, orders])
         :  +- FlinkLogicalDataStreamTableScan(table=[[Unregistered_DataStream_3]])
         +- FlinkLogicalCalc(select=[a])
            +- FlinkLogicalDataStreamTableScan(table=[[Unregistered_DataStream_6]])

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:147)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327)
at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
at bzflink.StreamingTable.main(StreamingTable.java:65)
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
at org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecJoinRule.matches(StreamExecJoinRule.scala:88)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:367)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:367)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1522)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1795)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1656)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1656)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1656)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:325)
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
... 20 more

Process finished with exit code 1

--
Regards,
Manoj  Kumar
Reply | Threaded
Open this post in threaded view
|

Re: [Problem] Unable to do join on TumblingEventTimeWindows using SQL

jeremyji
Hi, Fabian, i use time-windowed join according to the docs you give but still
have the problem.
Here is my flink sql look like:
SELECT
        a.account account,
        SUM(a.value) + SUM(b.value),
        UNIX_TIMESTAMP(TUMBLE_START(a.producer_timestamp, INTERVAL '3'
MINUTE))
FROM
        (SELECT
                account,
                value,
            producer_timestamp
        FROM
                table1) a,
        (SELECT
                account,
                value,
                producer_timestamp
        FROM
                table2) b
WHERE
        a.account = b.account AND
        a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
MINUTE AND b.producer_timestamp)
group by
        a.account,
        TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)

The exception is almost the same:
Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before.
Please check the documentation for the set of currently supported SQL
features.
        at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:450)
        at
org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:369)
        at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814)
        at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
        at
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
        at
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1048)
        at
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:962)
        at
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:922)

I think i use time-windowed join but flink told me its a regular join. Is
there anything wrong i haven't notice?

Jeremy



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/