Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated lookup table. Basically, I'd like to be able to set a refresh policy that is triggered either when a key was not found (a new key has probably been added in the mean time) or a configurable refresh-period has elapsed. Is there any suggested solution to this? The LookupableTableSource looks very similar to what I'd like to achieve but I can't find a real-world example using it and it lacks of such 2 requirements (key-values are not refreshed after a configurable timeout and a KeyNotFound callback cannot be handled). Any help is appreciated, Flavio |
Hi Flavio: I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. Or use blink temporal table join[3] (Need blink planner support). I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table). Is that you want? Best, JingsongLee
|
This could be a good fit, I'll try to dig into it and see if it can be adapted to a REST service. The only strange thing I see is that the key of the local cache is per block of keys..am I wrong? Shouldn't it cycle over the list of passed keys? Right now it's the following: Cache<Row, List<Row>> cache; public void eval(Object... keys) { Row keyRow = Row.of(keys); if (cache != null) { List<Row> cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } ... while I'd use the following (also for JDBC): Cache<Row, List<Row>> cache; public void eval(Object... keys) { Row keyRow = Row.of(keys); if (cache != null) { List<Row> cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } ... public void eval(Object... keys) { for (Object kkk : keys) { Row keyRow = Row.of(kkk); if (cache != null) { List<Row> cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } } ... Am I missing something? On Fri, Jun 28, 2019 at 4:18 PM JingsongLee <[hidden email]> wrote:
|
Sorry I copied and pasted twice the current eval method...I'd do this: public void eval(Object... keys) { for (Object kkk : keys) { Row keyRow = Row.of(kkk); if (cache != null) { List<Row> cachedRows = cache.getIfPresent(keyRow); if (cachedRows != null) { for (Row cachedRow : cachedRows) { collect(cachedRow); } return; } } } ... On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier <[hidden email]> wrote:
|
The keys means joint primary keys, it is not list of keys, in your case, maybe there is a single key?
Best, Jingsong Lee
|
I probably messed up with the meaning of eval()..thus it is called once for every distinct key (that could be composed by a combination of fields)? So, the other question is..how do I enable Blink planner support? Since when is LATERAL TABLE available in Flink? Is it equivalent to using temporal tables [1]? Best, Flavio On Sat, Jun 29, 2019 at 3:16 AM JingsongLee <[hidden email]> wrote: The keys means joint primary keys, it is not list of keys, in your case, maybe there is a single key? |
> how do I enable Blink planner support? After flink-1.9 release, you can try Blink-planner. >Since when is LATERAL TABLE available in Flink? Is it equivalent to using temporal tables? LATERAL TABLE is table function in table, it is available in Flink for a long time.[1] It is different from temporal table. Best, JingsongLee
|
Free forum by Nabble | Edit this page |