Hello,
I use Flink for continuous evaluation of SQL queries on streaming data. One of the use cases requires us to run recursive SQL queries. I am unable to find a way to edit rowtime time attribute of the intermediate result table. For example, let's assume that there is a table T0 with schema - root |-- str1: STRING |-- int1: BIGINT |-- utime: TIMESTAMP(3) Now, let's create a view V0 - var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0"); I wish to change the rowtime of V0 from itime to utime. I tried doing - V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime()); but ran into the following exception - org.apache.flink.table.api.ValidationException: Window properties can only be used on windowed tables. at org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158) ~[flink-table-api-java-1.11.1.jar:1.11.1] at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na] at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459) ~[flink-table-api-java-1.11.1.jar:1.11.1] Any guidance on how to address this? Regards, Satyam |
Thanks for your replies Matthias and Timo. Converting the Table to DataStream, assigning a new Watermark & Rowtime attribute, and converting back to Table makes sense. One challenge with that approach is that Table to DataStream conversion could emit retractable data stream, however, I think, that can now be handled with the new TableSource API (in 1.11) that allows TableSource to emit retractions. I'll try this approach when I migrate to the new API and report back. Regards, Satyam On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <[hidden email]> wrote:
|
Yes, the new TableSource API allows to emit retractions. However, it
does not give you direct access to DataStream API. FLIP-136 [1] might help you in the near future. We hope it can be part of 1.12. Regards, Timo [1] https://lists.apache.org/thread.html/r62b47ec6812ddbafed65ac79e31ca0305099893559f1e5a991dee550%40%3Cdev.flink.apache.org%3E On 01.09.20 22:55, Satyam Shekhar wrote: > Thanks for your replies Matthias and Timo. > > Converting the Table to DataStream, assigning a new Watermark & Rowtime > attribute, and converting back to Table makes sense. One challenge with > that approach is that Table to DataStream conversion could emit > retractable data stream, however, I think, that can now be handled with > the new TableSource API (in 1.11) that allows TableSource to emit > retractions. > > I'll try this approach when I migrate to the new API and report back. > > Regards, > Satyam > > On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Satyam, > > Matthias is right. A rowtime attribute cannot be modified and needs > to be passed "as is" through the pipeline. The only exceptions are > if a newer rowtime is offered such as `TUMBLE_ROWTIME` or > `MATCH_ROWTIME`. In your case, you need to define utime as the time > attribute. If this is not possible, you either express the > computation in regular SQL (with non-streaming optimizations) or you > go to DataStream API prepare the table (assign new watermark and > StreamRecord timestamp there) and go back to Table API. > > I hope this helps. > > Regards, > Timo > > On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl > <[hidden email] <mailto:[hidden email]>> wrote: > > Hi Satyam, > Thanks for your post. Unfortunately, it looks like you cannot > change the rowtime column here. The rowtime is strongly coupled > with the Watermarks feature. By changing the rowtime column we > cannot ensure that the watermarks are still aligned as Fabian > mentioned in [1]. > > @Timo Walther <mailto:[hidden email]> : Could you verify my > findings? > > Best, > Matthias > > [1] > https://stackoverflow.com/questions/52784089/flink-table-sql-api-modify-rowtime-attribute-after-session-window-aggregation > > On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar > <[hidden email] <mailto:[hidden email]>> wrote: > > Hello, > > I use Flink for continuous evaluation of SQL queries on > streaming data. One of the use cases requires us to run > recursive SQL queries. I am unable to find a way to edit > rowtime time attribute of the intermediate result table. > > For example, let's assume that there is a table T0 with schema - > root > |-- str1: STRING > |-- int1: BIGINT > |-- utime: TIMESTAMP(3) > |-- itime: TIMESTAMP(3) *ROWTIME* > > Now, let's create a view V0 - > var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime > from T0"); > > I wish to change the rowtime of V0 from itime to utime. I > tried doing - > > V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime()); > > but ran into the following exception - > > org.apache.flink.table.api.ValidationException: Window > properties can only be used on windowed tables. > at > org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854) > ~[flink-table-api-java-1.11.1.jar:1.11.1] > at > org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843) > ~[flink-table-api-java-1.11.1.jar:1.11.1] > at > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) > ~[flink-table-api-java-1.11.1.jar:1.11.1] > at > org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) > ~[flink-table-api-java-1.11.1.jar:1.11.1] > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158) > ~[flink-table-api-java-1.11.1.jar:1.11.1] > at > java.base/java.util.ArrayList.forEach(ArrayList.java:1540) > ~[na:na] > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158) > ~[flink-table-api-java-1.11.1.jar:1.11.1] > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207) > ~[flink-table-api-java-1.11.1.jar:1.11.1] > at > org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475) > ~[flink-table-api-java-1.11.1.jar:1.11.1] > at > org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459) > ~[flink-table-api-java-1.11.1.jar:1.11.1] > > Any guidance on how to address this? > > Regards, > Satyam > > > > -- > > Matthias Pohl| Engineer > > > Follow us @VervericaData Ververica <https://www.ververica.com/> > > -- > > Join Flink Forward <https://flink-forward.org/>- The Apache > FlinkConference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH| Invalidenstrasse 115, 10115 Berlin, Germany > > -- > > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, > Karl Anton Wehner > > > > -- > > Timo Walther| Software Engineer > > <https://data-artisans.com/> > > > <https://www.ververica.com/> > > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/>- The Apache > FlinkConference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl > Anton Wehner > |
Free forum by Nabble | Edit this page |