Hi experts, I am using flink table api to join two tables, which are datastream underneath. However, I got an assertion error of "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:
There in only one kafka data source, which is then converted to Table and registered. One existed column is set as rowtime(event time) attribute. Two over-window aggregation queries are run against the table and two tables are created as results. Everything works great so far. However when timed-window joining two result tables with inherented rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible cause? F.Y.I., I rename the rowtime column for one of the result table.
Best Yan
|
Hi Yan,
I’d like to look into this. Can you share more about your queries and the full stack trace? Thank, Xingcan
|
Hi Xingcan,
Thanks for your help. Attached is a sample code that can reproduce the problem. When I was writing the sample code, if I remove the `distinct` keyword in select clause, the AssertionError doesn't occur.
Best
Yan
From: xccui-foxmail <[hidden email]>
Sent: Wednesday, March 7, 2018 8:10 PM To: Yan Zhou [FDS Science] Cc: [hidden email] Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column Hi Yan,
I’d like to look into this. Can you share more about your queries and the full stack trace?
Thank,
Xingcan
Sample.java (8K) Download Attachment |
Hi Xingcan,
thanks for looking into this. This definitely seems to be a bug. Maybe in the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we should create an issue for it. Regards, Timo Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
|
Hi Yan, This is a bug in flink. As a workaround, you can cast eventTime to other basic sql types(for example, cast eventTime as varchar). @Timo and @Xingcan, I think we have to materialize time indicators in conditions of LogicalFilter. I created an issue and we can have more discussions there[1]. Best, Hequn On Thu, Mar 8, 2018 at 8:59 PM, Timo Walther <[hidden email]> wrote:
|
In reply to this post by Timo Walther
Hi Yan & Timo,
this is confirmed to be a bug and I’ve created an issue [1] for it. I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT keyword will be implemented with an aggregation, which outputs a retract stream [2]. In that situation, all the time-related fields will be materialized as if they were common fields (with the timestamp type). Currently, due to the semantics problem, the time-windowed join cannot be performed on retract streams. But you could try non-windowed join [3] after we fix this. Best, Xingcan [3] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
|
Hi Xingcan, Timo,
Thanks for the information.
I am going to convert the result table to DataStream and follow the logic of TimeBoundedStreamInnerJoin to do the timed-window join. Should I do this? Is there any concern from performance or stability perspective?
Best
Yan
From: Xingcan Cui <[hidden email]>
Sent: Thursday, March 8, 2018 8:21:42 AM To: Timo Walther Cc: user; Yan Zhou [FDS Science] Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column Hi Yan & Timo,
this is confirmed to be a bug and I’ve created an issue [1] for it.
I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT keyword will be implemented with an aggregation, which outputs a retract stream [2]. In that situation, all the time-related fields will be materialized as if they were common
fields (with the timestamp type). Currently, due to the semantics problem, the time-windowed join cannot be performed on retract streams. But you could try non-windowed join [3] after we fix this.
Best,
Xingcan
[3] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
|
Hi Yan,
I think you could try that as a workaround. Don’t forget to follow the DataStreamWindowJoin to hold back watermarks. We’ll continue improving the SQL/Table API part. Best, Xingcan
|
Another workaround would be to split
the query into two Table API parts.
You could do the join, convert into a data stream, and convert into table again. The optimizer does not optimize over DataStream API calls. What also should work is to cast your eventTs to TIMESTAMP as early as possible to prevent this bug. Let us know if this helped. I think this bug has a good chance to be fixed in 1.5.0 which will be released soon. Regards, Timo Am 3/9/18 um 3:28 PM schrieb Xingcan Cui: Hi Yan,
|
I think it would be easier if we cast eventTs&r_eventTs as TIMESTAMP and do non-window join. Something like: val sql1 = "select distinct id, cast(eventTs as timestamp) as eventTs, " + Hope this helps. On Fri, Mar 9, 2018 at 10:51 PM, Timo Walther <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |