Enrich stream with SQL api

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Enrich stream with SQL api

Marek Maj
Hello,
I am trying to use Flink SQL api to join two tables. My stream data source is kafka (defined through catalog and schema registry) and my enrichment data is located in relational database (JDBC connector). I think this setup reflects quite common use case

Enrichment table definition looks like this:
CREATE TABLE dim (
  ID BIGINT,
  ENRICH STRING,
  FROM_DATE TIMESTAMP(6),
  TO_DATE TIMESTAMP(6),
  WATERMARK FOR TO_DATE AS TO_DATE
) WITH (
   'connector' = 'jdbc',
   'url' = ‘…’,
   'table-name' = ‘…’,
   'username' = ‘…’,
   'password' = ‘…’
)


And this is join I use against stream coming from kafka (table with watermark spec), trunc is udf:
SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`,
D1.ENRICH as `ENRICH`,
T1.FIELD as `FIELD`,
FROM `kafka.topic` T1, dim D1
WHERE T1.ENRICH_ID = D1.ID
AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE
AND T1.START_TIME >= D1.FROM_DATE


Result job graph contains two table source scan operators together with interval join operator.
 
The problem I am trying to solve is how to change the character of enrichment table. Currently, related operator task reads whole data from table when the job start and finishes afterwards. Ideally, I would like to have continuously updated enrichment table. Is it possible to achieve without CDC for example by querying whole database periodically or use some kind of cache for keys? We can assume that enrichment table is append only, there are no deletes or updates, only inserts for new time intervals

If updates are not possible, how can I deal with finished task? Due to a known issue [1], all checkpoints are aborted . Maybe I could live with restarting job to get new enrichment data as it is not refreshed so frequently, but checkpointing is a must.

flink version 1.12

regards
Marek

Reply | Threaded
Open this post in threaded view
|

Re: Enrich stream with SQL api

Dawid Wysakowicz-2

Hi Marek,

I am afraid I don't have a good answer for your question. The problem indeed is that the JDBC source can work only as a bounded source. As you correctly pointed out, as of now mixing bounded with unbounded sources does not work with checkpointing, which we want to address in the FLIP-147 (that you linked as well).

I agree one solution would be to change the implementation of JDBCDynamicTableSource so that it produces an UNBOUNDED source. Unfortunately it is not the most straightforward task.

Another solution would be to actually use a CDC. I think you could use one of the connectors from here[1], which use the embedded Debezium engine, therefore you would not need to setup any external tools, but just embed the CDC in FLINK. Ofc, if I am not mistaken here, as I haven't tried those connectors myself.

Unfortunately I don't have any other ideas right now. Maybe someone else can chime in @Timo @Jark

Lastly, I think once you solve the problem of a finishing source you could also consider using the temporal join[2] instead of an interval join.

Best,

Dawid

[1] https://github.com/ververica/flink-cdc-connectors

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins

On 12/01/2021 16:40, Marek Maj wrote:
Hello,
I am trying to use Flink SQL api to join two tables. My stream data source is kafka (defined through catalog and schema registry) and my enrichment data is located in relational database (JDBC connector). I think this setup reflects quite common use case

Enrichment table definition looks like this:
CREATE TABLE dim (
  ID BIGINT,
  ENRICH STRING,
  FROM_DATE TIMESTAMP(6),
  TO_DATE TIMESTAMP(6),
  WATERMARK FOR TO_DATE AS TO_DATE
) WITH (
   'connector' = 'jdbc',
   'url' = ‘…’,
   'table-name' = ‘…’,
   'username' = ‘…’,
   'password' = ‘…’
)


And this is join I use against stream coming from kafka (table with watermark spec), trunc is udf:
SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`,
D1.ENRICH as `ENRICH`,
T1.FIELD as `FIELD`,
FROM `kafka.topic` T1, dim D1
WHERE T1.ENRICH_ID = D1.ID
AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE
AND T1.START_TIME >= D1.FROM_DATE


Result job graph contains two table source scan operators together with interval join operator.
 
The problem I am trying to solve is how to change the character of enrichment table. Currently, related operator task reads whole data from table when the job start and finishes afterwards. Ideally, I would like to have continuously updated enrichment table. Is it possible to achieve without CDC for example by querying whole database periodically or use some kind of cache for keys? We can assume that enrichment table is append only, there are no deletes or updates, only inserts for new time intervals

If updates are not possible, how can I deal with finished task? Due to a known issue [1], all checkpoints are aborted . Maybe I could live with restarting job to get new enrichment data as it is not refreshed so frequently, but checkpointing is a must.

flink version 1.12

regards
Marek


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Enrich stream with SQL api

Marek Maj
Hi Dawid,
thanks for your answers!

I guess CDC and Debezium will be the right choice for that case. Iit's integration with oracle however, is in incubating phase as documentation states [1], we will need to investigate further.

I was hoping there will be some way to incorporate LookupCache [2] in that scenario and bypass the problem of finished tasks. JDBCDynamicTableSource, that you mentioned, implements both LookupTableSource and ScanTableSource, but.I could not force the planner to invoke getLookupRuntimeProvider [3] 

Anyway, I will take a closer look at JDBCDynamicTableSource implementation and try to find a workaround

best regards,
Marek



czw., 14 sty 2021 o 20:07 Dawid Wysakowicz <[hidden email]> napisał(a):

Hi Marek,

I am afraid I don't have a good answer for your question. The problem indeed is that the JDBC source can work only as a bounded source. As you correctly pointed out, as of now mixing bounded with unbounded sources does not work with checkpointing, which we want to address in the FLIP-147 (that you linked as well).

I agree one solution would be to change the implementation of JDBCDynamicTableSource so that it produces an UNBOUNDED source. Unfortunately it is not the most straightforward task.

Another solution would be to actually use a CDC. I think you could use one of the connectors from here[1], which use the embedded Debezium engine, therefore you would not need to setup any external tools, but just embed the CDC in FLINK. Ofc, if I am not mistaken here, as I haven't tried those connectors myself.

Unfortunately I don't have any other ideas right now. Maybe someone else can chime in @Timo @Jark

Lastly, I think once you solve the problem of a finishing source you could also consider using the temporal join[2] instead of an interval join.

Best,

Dawid

[1] https://github.com/ververica/flink-cdc-connectors

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins

On 12/01/2021 16:40, Marek Maj wrote:
Hello,
I am trying to use Flink SQL api to join two tables. My stream data source is kafka (defined through catalog and schema registry) and my enrichment data is located in relational database (JDBC connector). I think this setup reflects quite common use case

Enrichment table definition looks like this:
CREATE TABLE dim (
  ID BIGINT,
  ENRICH STRING,
  FROM_DATE TIMESTAMP(6),
  TO_DATE TIMESTAMP(6),
  WATERMARK FOR TO_DATE AS TO_DATE
) WITH (
   'connector' = 'jdbc',
   'url' = ‘…’,
   'table-name' = ‘…’,
   'username' = ‘…’,
   'password' = ‘…’
)


And this is join I use against stream coming from kafka (table with watermark spec), trunc is udf:
SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`,
D1.ENRICH as `ENRICH`,
T1.FIELD as `FIELD`,
FROM `kafka.topic` T1, dim D1
WHERE T1.ENRICH_ID = D1.ID
AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE
AND T1.START_TIME >= D1.FROM_DATE


Result job graph contains two table source scan operators together with interval join operator.
 
The problem I am trying to solve is how to change the character of enrichment table. Currently, related operator task reads whole data from table when the job start and finishes afterwards. Ideally, I would like to have continuously updated enrichment table. Is it possible to achieve without CDC for example by querying whole database periodically or use some kind of cache for keys? We can assume that enrichment table is append only, there are no deletes or updates, only inserts for new time intervals

If updates are not possible, how can I deal with finished task? Due to a known issue [1], all checkpoints are aborted . Maybe I could live with restarting job to get new enrichment data as it is not refreshed so frequently, but checkpointing is a must.

flink version 1.12

regards
Marek

Reply | Threaded
Open this post in threaded view
|

Re: Enrich stream with SQL api

Dawid Wysakowicz-2

The LookupTableSource is used when you join based on processing time, as described in here[1]. Moreover it supports only equi lookups, therefore it won't work with range queries as in your case.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#processing-time-temporal-join

On 15/01/2021 09:25, Marek Maj wrote:
Hi Dawid,
thanks for your answers!

I guess CDC and Debezium will be the right choice for that case. Iit's integration with oracle however, is in incubating phase as documentation states [1], we will need to investigate further.

I was hoping there will be some way to incorporate LookupCache [2] in that scenario and bypass the problem of finished tasks. JDBCDynamicTableSource, that you mentioned, implements both LookupTableSource and ScanTableSource, but.I could not force the planner to invoke getLookupRuntimeProvider [3] 

Anyway, I will take a closer look at JDBCDynamicTableSource implementation and try to find a workaround

best regards,
Marek



czw., 14 sty 2021 o 20:07 Dawid Wysakowicz <[hidden email]> napisał(a):

Hi Marek,

I am afraid I don't have a good answer for your question. The problem indeed is that the JDBC source can work only as a bounded source. As you correctly pointed out, as of now mixing bounded with unbounded sources does not work with checkpointing, which we want to address in the FLIP-147 (that you linked as well).

I agree one solution would be to change the implementation of JDBCDynamicTableSource so that it produces an UNBOUNDED source. Unfortunately it is not the most straightforward task.

Another solution would be to actually use a CDC. I think you could use one of the connectors from here[1], which use the embedded Debezium engine, therefore you would not need to setup any external tools, but just embed the CDC in FLINK. Ofc, if I am not mistaken here, as I haven't tried those connectors myself.

Unfortunately I don't have any other ideas right now. Maybe someone else can chime in @Timo @Jark

Lastly, I think once you solve the problem of a finishing source you could also consider using the temporal join[2] instead of an interval join.

Best,

Dawid

[1] https://github.com/ververica/flink-cdc-connectors

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins

On 12/01/2021 16:40, Marek Maj wrote:
Hello,
I am trying to use Flink SQL api to join two tables. My stream data source is kafka (defined through catalog and schema registry) and my enrichment data is located in relational database (JDBC connector). I think this setup reflects quite common use case

Enrichment table definition looks like this:
CREATE TABLE dim (
  ID BIGINT,
  ENRICH STRING,
  FROM_DATE TIMESTAMP(6),
  TO_DATE TIMESTAMP(6),
  WATERMARK FOR TO_DATE AS TO_DATE
) WITH (
   'connector' = 'jdbc',
   'url' = ‘…’,
   'table-name' = ‘…’,
   'username' = ‘…’,
   'password' = ‘…’
)


And this is join I use against stream coming from kafka (table with watermark spec), trunc is udf:
SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`,
D1.ENRICH as `ENRICH`,
T1.FIELD as `FIELD`,
FROM `kafka.topic` T1, dim D1
WHERE T1.ENRICH_ID = D1.ID
AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE
AND T1.START_TIME >= D1.FROM_DATE


Result job graph contains two table source scan operators together with interval join operator.
 
The problem I am trying to solve is how to change the character of enrichment table. Currently, related operator task reads whole data from table when the job start and finishes afterwards. Ideally, I would like to have continuously updated enrichment table. Is it possible to achieve without CDC for example by querying whole database periodically or use some kind of cache for keys? We can assume that enrichment table is append only, there are no deletes or updates, only inserts for new time intervals

If updates are not possible, how can I deal with finished task? Due to a known issue [1], all checkpoints are aborted . Maybe I could live with restarting job to get new enrichment data as it is not refreshed so frequently, but checkpointing is a must.

flink version 1.12

regards
Marek


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Enrich stream with SQL api

Marek Maj
Good point, thanks for the clarification

best regards,
Marek

pt., 15 sty 2021 o 09:32 Dawid Wysakowicz <[hidden email]> napisał(a):

The LookupTableSource is used when you join based on processing time, as described in here[1]. Moreover it supports only equi lookups, therefore it won't work with range queries as in your case.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#processing-time-temporal-join

On 15/01/2021 09:25, Marek Maj wrote:
Hi Dawid,
thanks for your answers!

I guess CDC and Debezium will be the right choice for that case. Iit's integration with oracle however, is in incubating phase as documentation states [1], we will need to investigate further.

I was hoping there will be some way to incorporate LookupCache [2] in that scenario and bypass the problem of finished tasks. JDBCDynamicTableSource, that you mentioned, implements both LookupTableSource and ScanTableSource, but.I could not force the planner to invoke getLookupRuntimeProvider [3] 

Anyway, I will take a closer look at JDBCDynamicTableSource implementation and try to find a workaround

best regards,
Marek



czw., 14 sty 2021 o 20:07 Dawid Wysakowicz <[hidden email]> napisał(a):

Hi Marek,

I am afraid I don't have a good answer for your question. The problem indeed is that the JDBC source can work only as a bounded source. As you correctly pointed out, as of now mixing bounded with unbounded sources does not work with checkpointing, which we want to address in the FLIP-147 (that you linked as well).

I agree one solution would be to change the implementation of JDBCDynamicTableSource so that it produces an UNBOUNDED source. Unfortunately it is not the most straightforward task.

Another solution would be to actually use a CDC. I think you could use one of the connectors from here[1], which use the embedded Debezium engine, therefore you would not need to setup any external tools, but just embed the CDC in FLINK. Ofc, if I am not mistaken here, as I haven't tried those connectors myself.

Unfortunately I don't have any other ideas right now. Maybe someone else can chime in @Timo @Jark

Lastly, I think once you solve the problem of a finishing source you could also consider using the temporal join[2] instead of an interval join.

Best,

Dawid

[1] https://github.com/ververica/flink-cdc-connectors

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins

On 12/01/2021 16:40, Marek Maj wrote:
Hello,
I am trying to use Flink SQL api to join two tables. My stream data source is kafka (defined through catalog and schema registry) and my enrichment data is located in relational database (JDBC connector). I think this setup reflects quite common use case

Enrichment table definition looks like this:
CREATE TABLE dim (
  ID BIGINT,
  ENRICH STRING,
  FROM_DATE TIMESTAMP(6),
  TO_DATE TIMESTAMP(6),
  WATERMARK FOR TO_DATE AS TO_DATE
) WITH (
   'connector' = 'jdbc',
   'url' = ‘…’,
   'table-name' = ‘…’,
   'username' = ‘…’,
   'password' = ‘…’
)


And this is join I use against stream coming from kafka (table with watermark spec), trunc is udf:
SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`,
D1.ENRICH as `ENRICH`,
T1.FIELD as `FIELD`,
FROM `kafka.topic` T1, dim D1
WHERE T1.ENRICH_ID = D1.ID
AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE
AND T1.START_TIME >= D1.FROM_DATE


Result job graph contains two table source scan operators together with interval join operator.
 
The problem I am trying to solve is how to change the character of enrichment table. Currently, related operator task reads whole data from table when the job start and finishes afterwards. Ideally, I would like to have continuously updated enrichment table. Is it possible to achieve without CDC for example by querying whole database periodically or use some kind of cache for keys? We can assume that enrichment table is append only, there are no deletes or updates, only inserts for new time intervals

If updates are not possible, how can I deal with finished task? Due to a known issue [1], all checkpoints are aborted . Maybe I could live with restarting job to get new enrichment data as it is not refreshed so frequently, but checkpointing is a must.

flink version 1.12

regards
Marek