Editing Rowtime for SQL Table

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Editing Rowtime for SQL Table

Satyam Shekhar
Hello,

I use Flink for continuous evaluation of SQL queries on streaming data. One of the use cases requires us to run recursive SQL queries. I am unable to find a way to edit rowtime time attribute of the intermediate result table.

For example, let's assume that there is a table T0 with schema -
root
 |-- str1: STRING
 |-- int1: BIGINT
 |-- utime: TIMESTAMP(3)
 |-- itime: TIMESTAMP(3) *ROWTIME*

Now, let's create a view V0 -
var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0");

I wish to change the rowtime of V0 from itime to utime. I tried doing -

V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());

but ran into the following exception -

org.apache.flink.table.api.ValidationException: Window properties can only be used on windowed tables.
at org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459) ~[flink-table-api-java-1.11.1.jar:1.11.1]


Any guidance on how to address this? 

Regards,
Satyam
Reply | Threaded
Open this post in threaded view
|

Re: Editing Rowtime for SQL Table

Satyam Shekhar
Thanks for your replies Matthias and Timo. 

Converting the Table to DataStream, assigning a new Watermark & Rowtime attribute, and converting back to Table makes sense. One challenge with that approach is that Table to DataStream conversion could emit retractable data stream, however, I think, that can now be handled with the new TableSource API (in 1.11) that allows TableSource to emit retractions.

I'll try this approach when I migrate to the new API and report back.

Regards,
Satyam

On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <[hidden email]> wrote:
Hi Satyam,

Matthias is right. A rowtime attribute cannot be modified and needs to be passed "as is" through the pipeline. The only exceptions are if a newer rowtime is offered such as `TUMBLE_ROWTIME` or `MATCH_ROWTIME`. In your case, you need to define utime as the time attribute. If this is not possible, you either express the computation in regular SQL (with non-streaming optimizations) or you go to DataStream API prepare the table (assign new watermark and StreamRecord timestamp there) and go back to Table API.

I hope this helps.

Regards,
Timo

On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl <[hidden email]> wrote:
Hi Satyam,
Thanks for your post. Unfortunately, it looks like you cannot change the rowtime column here. The rowtime is strongly coupled with the Watermarks feature. By changing the rowtime column we cannot ensure that the watermarks are still aligned as Fabian mentioned in [1].

[hidden email] : Could you verify my findings?

Best,
Matthias

On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar <[hidden email]> wrote:
Hello,

I use Flink for continuous evaluation of SQL queries on streaming data. One of the use cases requires us to run recursive SQL queries. I am unable to find a way to edit rowtime time attribute of the intermediate result table.

For example, let's assume that there is a table T0 with schema -
root
 |-- str1: STRING
 |-- int1: BIGINT
 |-- utime: TIMESTAMP(3)
 |-- itime: TIMESTAMP(3) *ROWTIME*

Now, let's create a view V0 -
var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0");

I wish to change the rowtime of V0 from itime to utime. I tried doing -

V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());

but ran into the following exception -

org.apache.flink.table.api.ValidationException: Window properties can only be used on windowed tables.
at org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475) ~[flink-table-api-java-1.11.1.jar:1.11.1]
at org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459) ~[flink-table-api-java-1.11.1.jar:1.11.1]


Any guidance on how to address this? 

Regards,
Satyam


--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner


--

Timo Walther | Software Engineer



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner

Reply | Threaded
Open this post in threaded view
|

Re: Editing Rowtime for SQL Table

Timo Walther
Yes, the new TableSource API allows to emit retractions. However, it
does not give you direct access to DataStream API.

FLIP-136 [1] might help you in the near future. We hope it can be part
of 1.12.

Regards,
Timo

[1]
https://lists.apache.org/thread.html/r62b47ec6812ddbafed65ac79e31ca0305099893559f1e5a991dee550%40%3Cdev.flink.apache.org%3E

On 01.09.20 22:55, Satyam Shekhar wrote:

> Thanks for your replies Matthias and Timo.
>
> Converting the Table to DataStream, assigning a new Watermark & Rowtime
> attribute, and converting back to Table makes sense. One challenge with
> that approach is that Table to DataStream conversion could emit
> retractable data stream, however, I think, that can now be handled with
> the new TableSource API (in 1.11) that allows TableSource to emit
> retractions.
>
> I'll try this approach when I migrate to the new API and report back.
>
> Regards,
> Satyam
>
> On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Satyam,
>
>     Matthias is right. A rowtime attribute cannot be modified and needs
>     to be passed "as is" through the pipeline. The only exceptions are
>     if a newer rowtime is offered such as `TUMBLE_ROWTIME` or
>     `MATCH_ROWTIME`. In your case, you need to define utime as the time
>     attribute. If this is not possible, you either express the
>     computation in regular SQL (with non-streaming optimizations) or you
>     go to DataStream API prepare the table (assign new watermark and
>     StreamRecord timestamp there) and go back to Table API.
>
>     I hope this helps.
>
>     Regards,
>     Timo
>
>     On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl
>     <[hidden email] <mailto:[hidden email]>> wrote:
>
>         Hi Satyam,
>         Thanks for your post. Unfortunately, it looks like you cannot
>         change the rowtime column here. The rowtime is strongly coupled
>         with the Watermarks feature. By changing the rowtime column we
>         cannot ensure that the watermarks are still aligned as Fabian
>         mentioned in [1].
>
>         @Timo Walther <mailto:[hidden email]> : Could you verify my
>         findings?
>
>         Best,
>         Matthias
>
>         [1]
>         https://stackoverflow.com/questions/52784089/flink-table-sql-api-modify-rowtime-attribute-after-session-window-aggregation
>
>         On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar
>         <[hidden email] <mailto:[hidden email]>> wrote:
>
>             Hello,
>
>             I use Flink for continuous evaluation of SQL queries on
>             streaming data. One of the use cases requires us to run
>             recursive SQL queries. I am unable to find a way to edit
>             rowtime time attribute of the intermediate result table.
>
>             For example, let's assume that there is a table T0 with schema -
>             root
>               |-- str1: STRING
>               |-- int1: BIGINT
>               |-- utime: TIMESTAMP(3)
>               |-- itime: TIMESTAMP(3) *ROWTIME*
>
>             Now, let's create a view V0 -
>             var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime
>             from T0");
>
>             I wish to change the rowtime of V0 from itime to utime. I
>             tried doing -
>
>             V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());
>
>             but ran into the following exception -
>
>             org.apache.flink.table.api.ValidationException: Window
>             properties can only be used on windowed tables.
>             at
>             org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
>             ~[na:na]
>             at
>             org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>
>             Any guidance on how to address this?
>
>             Regards,
>             Satyam
>
>
>
>         --
>
>         Matthias Pohl| Engineer
>
>
>         Follow us @VervericaData Ververica <https://www.ververica.com/>
>
>         --
>
>         Join Flink Forward <https://flink-forward.org/>- The Apache
>         FlinkConference
>
>         Stream Processing | Event Driven | Real Time
>
>         --
>
>         Ververica GmbH| Invalidenstrasse 115, 10115 Berlin, Germany
>
>         --
>
>         Ververica GmbH
>         Registered at Amtsgericht Charlottenburg: HRB 158244 B
>         Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang,
>         Karl Anton Wehner
>
>
>
>     --
>
>     Timo Walther| Software Engineer
>
>     <https://data-artisans.com/>
>
>
>     <https://www.ververica.com/>
>
>
>     Follow us @VervericaData
>
>     --
>
>     Join Flink Forward <https://flink-forward.org/>- The Apache
>     FlinkConference
>
>     Stream Processing | Event Driven | Real Time
>
>     --
>
>     Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
>     --
>
>     Ververica GmbH
>     Registered at Amtsgericht Charlottenburg: HRB 158244 B
>     Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl
>     Anton Wehner
>