Flink SQL continuous join checkpointing

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL continuous join checkpointing

Taras Moisiuk
Hi everyone!
I'm using Flink 1.12.0 with SQL API.
My streaming job is basically a join of two dynamic tables (from kafka topics) and insertion the result into PostgreSQL table.

I have enabled watermarking based on kafka topic timestamp column for each table in join:

CREATE TABLE table1 (
....,

kafka_time TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR kafka_time AS kafka_time - INTERVAL '2' HOUR
)
WITH ('format' = 'json',
...

But checkpoint data size for join task is permanently increasing despite the watermarks on the tables and "Low watermark" mark in UI.

As far as I understand outdated records from both tables must be dropped from checkpoint after 2 hours, but looks like it holds all job state since the beginning.

Should I enable this behavior for outdated records explicitly of modify join query?
Thank you!

Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL continuous join checkpointing

Leonard Xu
Hi, Taras

But checkpoint data size for join task is permanently increasing despite the watermarks on the tables and "Low watermark" mark in UI. 
As far as I understand outdated records from both tables must be dropped from checkpoint after 2 hours, but looks like it holds all job state since the beginning.

What kind of join do you use?

If you’re using interval join [1], the outdated data in join operator state will be cleaned after interval join time period + watermark interval,and your understanding is wright in this case.

If you’re using regular join, Flink regular join will keep all data in state to ensure join semantics is same with classical DB which means the operator state will continuous to increase,  you can set the option table.exec.state.ttl [2] to clean the outdated data in state according your business. 


Best,
Leonard
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL continuous join checkpointing

Taras Moisiuk
Hi Leonard,

Thank you for answer, in fact I used regular join because my interval condition was based on wrong column.

I extended my join with attribute column condition and it solved the problem:
...
FROM table_fx fx
LEFT JOIN table_v v ON v.active = fx.instrument_active_id
AND v.kafka_time BETWEEN fx.kafka_time - INTERVAL '10' MINUTE AND fx.kafka_time
...




Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.