Hello,
I am trying to use Flink SQL api to join two tables. My stream data source is kafka (defined through catalog and schema registry) and my enrichment data is located in relational database (JDBC connector). I think this setup reflects quite common use case Enrichment table definition looks like this: CREATE TABLE dim ( ID BIGINT, ENRICH STRING, FROM_DATE TIMESTAMP(6), TO_DATE TIMESTAMP(6), WATERMARK FOR TO_DATE AS TO_DATE ) WITH ( 'connector' = 'jdbc', 'url' = ‘…’, 'table-name' = ‘…’, 'username' = ‘…’, 'password' = ‘…’ ) And this is join I use against stream coming from kafka (table with watermark spec), trunc is udf: SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`, D1.ENRICH as `ENRICH`, T1.FIELD as `FIELD`, FROM `kafka.topic` T1, dim D1 WHERE T1.ENRICH_ID = D1.ID AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE AND T1.START_TIME >= D1.FROM_DATE Result job graph contains two table source scan operators together with interval join operator. The problem I am trying to solve is how to change the character of enrichment table. Currently, related operator task reads whole data from table when the job start and finishes afterwards. Ideally, I would like to have continuously updated enrichment table. Is it possible to achieve without CDC for example by querying whole database periodically or use some kind of cache for keys? We can assume that enrichment table is append only, there are no deletes or updates, only inserts for new time intervals If updates are not possible, how can I deal with finished task? Due to a known issue [1], all checkpoints are aborted . Maybe I could live with restarting job to get new enrichment data as it is not refreshed so frequently, but checkpointing is a must. flink version 1.12 regards Marek |
Hi Marek, I am afraid I don't have a good answer for your question. The problem indeed is that the JDBC source can work only as a bounded source. As you correctly pointed out, as of now mixing bounded with unbounded sources does not work with checkpointing, which we want to address in the FLIP-147 (that you linked as well). I agree one solution would be to change the implementation of
JDBCDynamicTableSource so that it produces an UNBOUNDED source.
Unfortunately it is not the most straightforward task. Another solution would be to actually use a CDC. I think you
could use one of the connectors from here[1], which use the
embedded Debezium engine, therefore you would not need to setup
any external tools, but just embed the CDC in FLINK. Ofc, if I am
not mistaken here, as I haven't tried those connectors myself. Unfortunately I don't have any other ideas right now. Maybe someone else can chime in @Timo @Jark Lastly, I think once you solve the problem of a finishing source you could also consider using the temporal join[2] instead of an interval join. Best, Dawid [1] https://github.com/ververica/flink-cdc-connectors On 12/01/2021 16:40, Marek Maj wrote:
signature.asc (849 bytes) Download Attachment |
Hi Dawid, thanks for your answers! I guess CDC and Debezium will be the right choice for that case. Iit's integration with oracle however, is in incubating phase as documentation states [1], we will need to investigate further. I was hoping there will be some way to incorporate LookupCache [2] in that scenario and bypass the problem of finished tasks. JDBCDynamicTableSource, that you mentioned, implements both LookupTableSource and ScanTableSource, but.I could not force the planner to invoke getLookupRuntimeProvider [3] Anyway, I will take a closer look at JDBCDynamicTableSource implementation and try to find a workaround best regards, Marek czw., 14 sty 2021 o 20:07 Dawid Wysakowicz <[hidden email]> napisał(a):
|
The LookupTableSource is used when you join based on processing
time, as described in here[1]. Moreover it supports only equi
lookups, therefore it won't work with range queries as in your
case. Best, Dawid On 15/01/2021 09:25, Marek Maj wrote:
signature.asc (849 bytes) Download Attachment |
Good point, thanks for the clarification best regards, Marek pt., 15 sty 2021 o 09:32 Dawid Wysakowicz <[hidden email]> napisał(a):
|
Free forum by Nabble | Edit this page |