Dear List, I have trouble implementing a join between two streaming tables in Python Table API. The left table of my join should be enriched with the information with the last value of the right_table. The right_table is updated only rarely (maybe after 15 minutes). When implementing the join I get only updates
when the right table changes. I want to trigger the updates for the joined table every time when I receive a record on the left side. The record should be enriched with the most recent result of the right side. I have not found a way to implement with the
desired result. It tried an implementation using a versioned view. Here is a short example: left_table root |-- measurement_time: TIMESTAMP(3) *ROWTIME* |-- x: DOUBLE |-- y: DOUBLE |-- proctime: TIMESTAMP(3) NOT NULL *PROCTIME* AS PROCTIME() |-- WATERMARK FOR measurement_time: TIMESTAMP(3) AS `measurement_time` right_table |-- some_value: INT |-- id: INT |-- modtime: TIMESTAMP(3) *ROWTIME* The "id" is always defined as 1. I perform the following operations t_env.create_temporary_view("left_table", left_table.add_columns("1.cast(INT) AS left_artificial_key")) t_env.create_temporary_view("right_table", right_table) sql_view = """ -- Define a versioned view CREATE VIEW versioned_right AS SELECT id, some_value, modtime
FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY id
ORDER BY modtime DESC) AS rownum FROM right_table) WHERE rownum = 1 """ view = t_env.execute_sql(sql_view) sql = """ SELECT left_table.*, versioned_right.some_value FROM left_table LEFT JOIN versioned_right FOR SYSTEM_TIME AS OF left_table.measurement_time ON abt.left_artificial_key = versioned_right.id """ joined = t_env.sql_query(sql) I observed the same behavior when using a lateral join. Does anybody have an idea how the join could be implemented in the correct way? Any comments or ideas are very welcome. Thanks Torben Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang |
Hi, Torben
The event-time temporal join versioned table is triggered watermark which calculated by both left and right table’s watermark, so you get only updated when the right table changes(which is the slower one in your case). The right table may change multiple times, we need to know when it changes and then output the right joined result after.
You can try to set a prober value for `table.exec.source.idle-timeout`(e.g:1minute) for your job, thus the right table will be marked as temporarily idle and the downstream join operator will only use the left table’s watermark, you can get updated at most 1 minute[1]. Another way is you can lookup the latest right table(if the table implements LookupTableSource, e.g. JDBC/HBase tables), the join will always return the most up-to-date value for a given key, you can get update immediately when input record from left table[2]. Best, Leonard |
Hi Leonard, thanks for your answer. My data source is kafka so I cannot use the second option. The first option is unfortunately not working. I introduced
the parameter but the updates are still only triggered by a change on the right side. As a workaround I use the last_value operator right now. This seems to work for me right now but could result in problems
in the future. Best Torben Von: Leonard Xu [mailto:[hidden email]]
Hi, Torben
The event-time temporal join versioned table is triggered watermark which calculated by both left and right table’s watermark, so you get only updated when the right table changes(which is the slower one in your case). The right table
may change multiple times, we need to know when it changes and then output the right joined result after.
You can try to set a prober value for `table.exec.source.idle-timeout`(e.g:1minute) for your job, thus the right table will be marked as temporarily idle and the downstream join operator will only use the left table’s watermark, you can
get updated at most 1 minute[1]. Another way is you can lookup the latest right table(if the table implements LookupTableSource, e.g. JDBC/HBase tables), the join will always return the most up-to-date value for a given key, you can get update immediately when input record
from left table[2]. Best, Leonard Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang |
Hi, Torben
Happy to hear you address your problem, the first option can resolve the situation that partial partitions of the Kafka topic did not receive data, but if all partitions didn’t receive data, the watermark won’t be pushed forward, and the temporal join won’t be triggered. Otherwise, we may get unexpected join result because after we output the join result the change from the versioned table side happens. Best, Leonard |
Hi Leonard,
I have just realized that "last_value" operator does not work since it produces updates if the right side changes. I just need the current state in the moment I receive a message on the left side. It is indeed a lookup which O want to perform and not a real join. Since my topic only has one partition the first option will also not work for me. Following your comments I guess I need to reconsider the design of my problem. Best, Torben -----Ursprüngliche Nachricht----- Von: Leonard Xu [mailto:[hidden email]] Gesendet: Dienstag, 19. Januar 2021 10:36 An: Barth, Torben <[hidden email]> Cc: [hidden email] Betreff: Re: Pyflink Join with versioned view / table Hi, Torben Happy to hear you address your problem, the first option can resolve the situation that partial partitions of the Kafka topic did not receive data, but if all partitions didn't receive data, the watermark won't be pushed forward, and the temporal join won't be triggered. Otherwise, we may get unexpected join result because after we output the join result the change from the versioned table side happens. Best, Leonard Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang |
Free forum by Nabble | Edit this page |