I'm trying to group some data and then enrich it by joining with a temporal table function, however my test code (attached) is failing with the error shown below. Can someone please give me a clue as to what I'm doing wrong? at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279) at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259) at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605) at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230) at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374) at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272) at test.Test.main(Test.java:78) Test.java (8K) Download Attachment |
Hi Chris,
the exception message is a bit misleading. The time attribute (time indicator) type is an internal type and should not be used by users. The following line should solve your issue. Instead of: DataStream<Tuple2<Boolean, Row>> tradesByInstrStream = tableEnv.toRetractStream(tradesByInstr, typeInfo); You can do DataStream<Tuple2<Boolean, Row>> tradesByInstrStream = tableEnv.toRetractStream(tradesByInstr, Row.class); The API will automatically insert the right types for the table passed when using a plain `Row.class`. I hope this helps. Regards, Timo Am 25.01.19 um 20:14 schrieb Chris
Miller:
|
Thanks Timo, I didn't realise supplying Row could automatically apply the correct types. In this case your suggestion doesn't solve the problem though, I still get the exact same error. I assume that's because there isn't a time attribute type on the tradesByInstr table itself, but rather on the groupedTrades table that it joins with. System.out.println(tradesByInstr.getSchema().toRowType()) outputs: -> Row(InstrumentId: Integer, Name: String, ClosePrice: Double, TradeCount: Long, Quantity: Double, Cost: Double) System.out.println(groupedTrades.getSchema().toRowType()) outputs: Looking at the stack trace it seems the query optimiser is tripping up on the LastTrade_EventTime column, but that is required for the temporal table join. Any other ideas on how I can work around this problem? Many thanks, Chris
------ Original Message ------
From: "Timo Walther" <[hidden email]>
To: "Chris Miller" <[hidden email]>; "user" <[hidden email]>
Sent: 29/01/2019 09:44:14
Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3) Hi Chris, |
Sorry to reply to my own post but I wasn't able to figure out a solution for this. Does anyone have any suggestions I could try? ------ Original Message ------
From: "Chris Miller" <[hidden email]>
To: "Timo Walther" <[hidden email]>; "user" <[hidden email]>
Sent: 29/01/2019 10:06:47
Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3)
|
Hi Chris,
the error that you've observed is a bug that might be related to another bug that is not easily solvable. I created an issue for it nevertheless: https://issues.apache.org/jira/browse/FLINK-11543 In general, I think you need to adapt your program in any case. Because you are aggregating on a rowtime attribute, it will loose its time attribute property and becomes a regular timestamp. Thus, you can't use it for a temporal table join. Maybe the following training from the last FlinkForward conference might help you. I would recommend the slide set there to understand the different between streaming operations and what we call "materializing" operations: https://github.com/dataArtisans/sql-training/wiki/SQL-on-streams I hope this helps. Feel free to ask further questions. Regards, Timo Am 05.02.19 um 11:30 schrieb Chris Miller: > Exception in thread "main" java.lang.AssertionError: mismatched type > $5 TIMESTAMP(3) > at > org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481) > at > org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459) > at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) > at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151) > at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100) > at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) > at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279) > at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241) > at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259) > at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605) > at > org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230) > at > org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374) > at > org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292) > at > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272) > at test.Test.main(Test.java:78) |
Hi Timo,
Thanks for the pointers, bug reports and slides, much appreciated. I'll read up to get a better understanding of the issue and hopefully figure out a more appropriate solution for what I'm trying achieve. I'll report back if I come up with anything that others might find useful. Regards, Chris ------ Original Message ------ From: "Timo Walther" <[hidden email]> To: [hidden email] Sent: 06/02/2019 16:45:26 Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3) >Hi Chris, > >the error that you've observed is a bug that might be related to another bug that is not easily solvable. > >I created an issue for it nevertheless: https://issues.apache.org/jira/browse/FLINK-11543 > >In general, I think you need to adapt your program in any case. Because you are aggregating on a rowtime attribute, it will loose its time attribute property and becomes a regular timestamp. Thus, you can't use it for a temporal table join. > >Maybe the following training from the last FlinkForward conference might help you. I would recommend the slide set there to understand the different between streaming operations and what we call "materializing" operations: > >https://github.com/dataArtisans/sql-training/wiki/SQL-on-streams > >I hope this helps. Feel free to ask further questions. > >Regards, >Timo > >Am 05.02.19 um 11:30 schrieb Chris Miller: >>Exception in thread "main" java.lang.AssertionError: mismatched type $5 TIMESTAMP(3) >> at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481) >> at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459) >> at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) >> at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151) >> at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100) >> at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34) >> at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) >> at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279) >> at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241) >> at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259) >> at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605) >> at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230) >> at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344) >> at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) >> at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646) >> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) >> at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374) >> at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292) >> at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812) >> at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) >> at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340) >> at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272) >> at test.Test.main(Test.java:78) > > |
Free forum by Nabble | Edit this page |