Pyflink Join with versioned view / table

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Pyflink Join with versioned view / table

Barth, Torben

Dear List,

 

I have trouble implementing a join between two streaming tables in Python Table API.

 

The left table of my join should be enriched with the information with the last value of the right_table. The right_table is updated only rarely (maybe after 15 minutes). When implementing the join I get only updates when the right table changes. I want to trigger the updates for the joined table every time when I receive a record on the left side. The record should be enriched with the most recent result of the right side. I have not found a way to implement with the desired result.

 

It tried an implementation using a versioned view. Here is a short example:

 

left_table

root

|-- measurement_time: TIMESTAMP(3) *ROWTIME*

|-- x: DOUBLE

|-- y: DOUBLE

|-- proctime: TIMESTAMP(3) NOT NULL *PROCTIME* AS PROCTIME()

|-- WATERMARK FOR measurement_time: TIMESTAMP(3) AS `measurement_time`

 

right_table

|-- some_value: INT

|-- id: INT

|-- modtime: TIMESTAMP(3) *ROWTIME*

 The "id" is always defined as 1.

 I perform the following operations

 

t_env.create_temporary_view("left_table", left_table.add_columns("1.cast(INT) AS left_artificial_key"))

t_env.create_temporary_view("right_table", right_table)

 

sql_view = """

-- Define a versioned view

                CREATE VIEW versioned_right AS

                SELECT id, some_value, modtime             

                  FROM (

                                 SELECT *,

                                 ROW_NUMBER() OVER (PARTITION BY id 

                                                ORDER BY modtime DESC) AS rownum

                                 FROM right_table)

                WHERE rownum = 1

"""

 

view = t_env.execute_sql(sql_view)

 

sql = """

                               SELECT

                               left_table.*, versioned_right.some_value

                FROM left_table

                LEFT JOIN versioned_right FOR SYSTEM_TIME AS OF left_table.measurement_time

                ON abt.left_artificial_key = versioned_right.id

"""

 

joined = t_env.sql_query(sql)

  

   

I observed the same behavior when using a lateral join.

 

Does anybody have an idea how the join could be implemented in the correct way?

 

Any comments or ideas are very welcome.

 

Thanks

Torben


Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
Reply | Threaded
Open this post in threaded view
|

Re: Pyflink Join with versioned view / table

Leonard Xu
Hi, Torben

When implementing the join I get only updates when the right table changes

The event-time temporal join versioned table is triggered watermark which calculated by both left and right table’s watermark, so you  get only updated when the right table changes(which is the slower one in your case). The right table may change multiple times, we need to know when it changes and then output the right joined result after. 

Does anybody have an idea how the join could be implemented in the correct way?

You can try to set a prober value for `table.exec.source.idle-timeout`(e.g:1minute) for your job, thus the right table will be marked as temporarily idle and the downstream join operator will only use the left table’s watermark, you can get updated at most 1 minute[1].

Another way is you can lookup the latest right table(if the table implements LookupTableSource, e.g. JDBC/HBase tables), the join will always return the most up-to-date value for a given key, you can get update immediately when input record from left table[2].

Best,
Leonard


Reply | Threaded
Open this post in threaded view
|

AW: Pyflink Join with versioned view / table

Barth, Torben

Hi Leonard,

 

thanks for your answer.

 

My data source is kafka so I cannot use the second option. The first option is unfortunately not working. I introduced the parameter but the updates are still only triggered by a change on the right side.

 

As a workaround I use the last_value operator right now. This seems to work for me right now but could result in problems in the future.

 

Best

Torben

 

Von: Leonard Xu [mailto:[hidden email]]
Gesendet: Samstag, 16. Januar 2021 15:05
An: Barth, Torben <[hidden email]>
Cc: [hidden email]
Betreff: Re: Pyflink Join with versioned view / table

 

Hi, Torben

 

When implementing the join I get only updates when the right table changes

 

The event-time temporal join versioned table is triggered watermark which calculated by both left and right table’s watermark, so you  get only updated when the right table changes(which is the slower one in your case). The right table may change multiple times, we need to know when it changes and then output the right joined result after. 



Does anybody have an idea how the join could be implemented in the correct way?

 

You can try to set a prober value for `table.exec.source.idle-timeout`(e.g:1minute) for your job, thus the right table will be marked as temporarily idle and the downstream join operator will only use the left table’s watermark, you can get updated at most 1 minute[1].

 

Another way is you can lookup the latest right table(if the table implements LookupTableSource, e.g. JDBC/HBase tables), the join will always return the most up-to-date value for a given key, you can get update immediately when input record from left table[2].

 

Best,

Leonard

 

 


Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
Reply | Threaded
Open this post in threaded view
|

Re: Pyflink Join with versioned view / table

Leonard Xu
Hi, Torben

Happy to hear you address your problem, the first option can resolve the situation that partial partitions of the Kafka topic did not receive data,
but if all partitions didn’t receive data, the watermark won’t  be pushed forward, and the temporal join won’t be triggered.
Otherwise, we may get unexpected join result  because after we output the join result the change from the versioned table side happens.  

Best,
Leonard
Reply | Threaded
Open this post in threaded view
|

AW: Pyflink Join with versioned view / table

Barth, Torben
Hi Leonard,

I have just realized that "last_value" operator does not work since it produces updates if the right side changes. I just need the current state in the moment I receive a message on the left side. It is indeed a lookup which O want to perform and not a real join.

Since my topic only has one partition the first option will also not work for me. Following your comments I guess I need to reconsider the design of my problem.

Best,
Torben

-----Ursprüngliche Nachricht-----
Von: Leonard Xu [mailto:[hidden email]]
Gesendet: Dienstag, 19. Januar 2021 10:36
An: Barth, Torben <[hidden email]>
Cc: [hidden email]
Betreff: Re: Pyflink Join with versioned view / table

Hi, Torben

Happy to hear you address your problem, the first option can resolve the situation that partial partitions of the Kafka topic did not receive data, but if all partitions didn't receive data, the watermark won't  be pushed forward, and the temporal join won't be triggered.
Otherwise, we may get unexpected join result  because after we output the join result the change from the versioned table side happens.

Best,
Leonard

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang