What is the best way to have a cache of an external database in Flink?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

What is the best way to have a cache of an external database in Flink?

HarshithBolar

Hi all,

The external database consists of a set of rules for each key, these rules should be applied on each stream element in the Flink job. Because it is very expensive to make a DB call for each element and retrieve the rules, I want to fetch the rules from the database at initialization and store it in a local cache.

When rules are updated in the external database, a status change event is published to the Flink job which should be used to fetch the rules and refresh this cache.

What is the best way to achieve what I've described? I looked into keyed state but initializing all keys and refreshing the keys on update doesn't seem possible.

Thanks,

Harshith

Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to have a cache of an external database in Flink?

selvarajchennappan@gmail.com
Hi,
Perhaps  broadcast state is natural fit for this scenario.

Thanks,
Selvaraj C

On Fri, 22 Jan 2021 at 8:45 PM, Kumar Bolar, Harshith <[hidden email]> wrote:

Hi all,

The external database consists of a set of rules for each key, these rules should be applied on each stream element in the Flink job. Because it is very expensive to make a DB call for each element and retrieve the rules, I want to fetch the rules from the database at initialization and store it in a local cache.

When rules are updated in the external database, a status change event is published to the Flink job which should be used to fetch the rules and refresh this cache.

What is the best way to achieve what I've described? I looked into keyed state but initializing all keys and refreshing the keys on update doesn't seem possible.

Thanks,

Harshith

--





Regards,
Selvaraj C
Reply | Threaded
Open this post in threaded view
|

AW: What is the best way to have a cache of an external database in Flink?

Jan Oelschlegel

But then you need a way to consume a database as a DataStream.

 

I found this one https://github.com/ververica/flink-cdc-connectors.

 

I want to implement a similar use case, but I don’t know how to parse the SourceRecord (which comes from the connector) into an PoJo for further processing.

 

Best,

Jan

 

Von: Selvaraj chennappan <[hidden email]>
Gesendet: Freitag, 22. Januar 2021 18:09
An: Kumar Bolar, Harshith <[hidden email]>
Cc: user <[hidden email]>
Betreff: Re: What is the best way to have a cache of an external database in Flink?

 

Hi,

Perhaps  broadcast state is natural fit for this scenario.

 

Thanks,

Selvaraj C

 

On Fri, 22 Jan 2021 at 8:45 PM, Kumar Bolar, Harshith <[hidden email]> wrote:

Hi all,

The external database consists of a set of rules for each key, these rules should be applied on each stream element in the Flink job. Because it is very expensive to make a DB call for each element and retrieve the rules, I want to fetch the rules from the database at initialization and store it in a local cache.

When rules are updated in the external database, a status change event is published to the Flink job which should be used to fetch the rules and refresh this cache.

What is the best way to achieve what I've described? I looked into keyed state but initializing all keys and refreshing the keys on update doesn't seem possible.

Thanks,

Harshith

--

 

 

 

 

 

Regards,
Selvaraj C

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.
Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to have a cache of an external database in Flink?

David Anderson-4
I provided an answer on stackoverflow, where I said the following:

A few different mechanisms in Flink may be relevant to this use case, depending on your detailed requirements.

Broadcast State

Jaya Ananthram has already covered the idea of using broadcast state in his answer. This makes sense if the rules should be applied globally, for every key, and if you can find a way to collect and broadcast the updates.

Note that the Context in the processBroadcastElement() of a KeyedBroadcastProcessFunction method contains the method applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function). This means you can register a KeyedStateFunction that will be applied to all states of all keys associated with the provided stateDescriptor.

State Processor API

If you want to bootstrap state in a Flink savepoint from a database dump, you can do that with this library. You'll find a simple example of using the State Processor API to bootstrap state in this gist.

Change Data Capture

The Table/SQL API supports DebeziumCanal, and Maxwell CDC streams, and Kafka upsert streams. This may be a solution. There's also flink-cdc-connectors.

Lookup Joins

Flink SQL can do temporal lookup joins against a JDBC database, with a configurable cache. Not sure this is relevant.


On Fri, Jan 22, 2021 at 7:30 PM Jan Oelschlegel <[hidden email]> wrote:

But then you need a way to consume a database as a DataStream.

 

I found this one https://github.com/ververica/flink-cdc-connectors.

 

I want to implement a similar use case, but I don’t know how to parse the SourceRecord (which comes from the connector) into an PoJo for further processing.

 

Best,

Jan

 

Von: Selvaraj chennappan <[hidden email]>
Gesendet: Freitag, 22. Januar 2021 18:09
An: Kumar Bolar, Harshith <[hidden email]>
Cc: user <[hidden email]>
Betreff: Re: What is the best way to have a cache of an external database in Flink?

 

Hi,

Perhaps  broadcast state is natural fit for this scenario.

 

Thanks,

Selvaraj C

 

On Fri, 22 Jan 2021 at 8:45 PM, Kumar Bolar, Harshith <[hidden email]> wrote:

Hi all,

The external database consists of a set of rules for each key, these rules should be applied on each stream element in the Flink job. Because it is very expensive to make a DB call for each element and retrieve the rules, I want to fetch the rules from the database at initialization and store it in a local cache.

When rules are updated in the external database, a status change event is published to the Flink job which should be used to fetch the rules and refresh this cache.

What is the best way to achieve what I've described? I looked into keyed state but initializing all keys and refreshing the keys on update doesn't seem possible.

Thanks,

Harshith

--

 

 

 

 

 

Regards,
Selvaraj C

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.