LookupableTableSource question

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

LookupableTableSource question

Flavio Pompermaier
Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either when a key was not found (a new key has probably been added in the mean time) or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very similar to what I'd like to achieve but I can't find a real-world example using it and it lacks of such 2 requirements (key-values are not refreshed after a configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: LookupableTableSource question

JingsongLee
Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. Or use
blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?


 Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年6月28日(星期五) 21:04
To:user <[hidden email]>
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either when a key was not found (a new key has probably been added in the mean time) or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very similar to what I'd like to achieve but I can't find a real-world example using it and it lacks of such 2 requirements (key-values are not refreshed after a configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: LookupableTableSource question

Flavio Pompermaier
This could be a good fit, I'll try to dig into it and see if it can be adapted to a REST service.
The only strange thing I see is that the key of the local cache is per block of keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache<Row, List<Row>> cache;

public void eval(Object... keys) {
    Row keyRow = Row.of(keys);
    if (cache != null) {
        List<Row> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (Row cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }   
 ...

while I'd use the following (also for JDBC):

Cache<Row, List<Row>> cache;

public void eval(Object... keys) {
    Row keyRow = Row.of(keys);
    if (cache != null) {
        List<Row> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (Row cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }   
 ...

public void eval(Object... keys) {
    for (Object kkk : keys) {
        Row keyRow = Row.of(kkk);
        if (cache != null) {
            List<Row> cachedRows = cache.getIfPresent(keyRow);
            if (cachedRows != null) {
                for (Row cachedRow : cachedRows) {
                    collect(cachedRow);
                }
                return;
            }
        }
    }
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee <[hidden email]> wrote:
Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. Or use
blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?


 Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年6月28日(星期五) 21:04
To:user <[hidden email]>
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either when a key was not found (a new key has probably been added in the mean time) or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very similar to what I'd like to achieve but I can't find a real-world example using it and it lacks of such 2 requirements (key-values are not refreshed after a configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: LookupableTableSource question

Flavio Pompermaier
Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
    for (Object kkk : keys) {
        Row keyRow = Row.of(kkk);
        if (cache != null) {
            List<Row> cachedRows = cache.getIfPresent(keyRow);
            if (cachedRows != null) {
                for (Row cachedRow : cachedRows) {
                    collect(cachedRow);
                }
                return;
            }
        }
    }
 ...

On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier <[hidden email]> wrote:
This could be a good fit, I'll try to dig into it and see if it can be adapted to a REST service.
The only strange thing I see is that the key of the local cache is per block of keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache<Row, List<Row>> cache;

public void eval(Object... keys) {
    Row keyRow = Row.of(keys);
    if (cache != null) {
        List<Row> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (Row cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }   
 ...

while I'd use the following (also for JDBC):

Cache<Row, List<Row>> cache;

public void eval(Object... keys) {
    Row keyRow = Row.of(keys);
    if (cache != null) {
        List<Row> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (Row cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }   
 ...

public void eval(Object... keys) {
    for (Object kkk : keys) {
        Row keyRow = Row.of(kkk);
        if (cache != null) {
            List<Row> cachedRows = cache.getIfPresent(keyRow);
            if (cachedRows != null) {
                for (Row cachedRow : cachedRows) {
                    collect(cachedRow);
                }
                return;
            }
        }
    }
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee <[hidden email]> wrote:
Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. Or use
blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?


 Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年6月28日(星期五) 21:04
To:user <[hidden email]>
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either when a key was not found (a new key has probably been added in the mean time) or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very similar to what I'd like to achieve but I can't find a real-world example using it and it lacks of such 2 requirements (key-values are not refreshed after a configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: LookupableTableSource question

JingsongLee
The keys means joint primary keys, it is not list of keys, in your case, maybe there is a single key?

Best, Jingsong Lee


------------------Original Mail ------------------
From:Flavio Pompermaier <[hidden email]>
Date:2019-06-28 22:53:31
Recipient:JingsongLee <[hidden email]>
CC:user <[hidden email]>
Subject:Re: LookupableTableSource question
Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
    for (Object kkk : keys) {
        Row keyRow = Row.of(kkk);
        if (cache != null) {
            List<Row> cachedRows = cache.getIfPresent(keyRow);
            if (cachedRows != null) {
                for (Row cachedRow : cachedRows) {
                    collect(cachedRow);
                }
                return;
            }
        }
    }
 ...

On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier <[hidden email]> wrote:
This could be a good fit, I'll try to dig into it and see if it can be adapted to a REST service.
The only strange thing I see is that the key of the local cache is per block of keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache<Row, List<Row>> cache;

public void eval(Object... keys) {
    Row keyRow = Row.of(keys);
    if (cache != null) {
        List<Row> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (Row cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }   
 ...

while I'd use the following (also for JDBC):

Cache<Row, List<Row>> cache;

public void eval(Object... keys) {
    Row keyRow = Row.of(keys);
    if (cache != null) {
        List<Row> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (Row cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }   
 ...

public void eval(Object... keys) {
    for (Object kkk : keys) {
        Row keyRow = Row.of(kkk);
        if (cache != null) {
            List<Row> cachedRows = cache.getIfPresent(keyRow);
            if (cachedRows != null) {
                for (Row cachedRow : cachedRows) {
                    collect(cachedRow);
                }
                return;
            }
        }
    }
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee <[hidden email]> wrote:
Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. Or use
blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?


 Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年6月28日(星期五) 21:04
To:user <[hidden email]>
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either when a key was not found (a new key has probably been added in the mean time) or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very similar to what I'd like to achieve but I can't find a real-world example using it and it lacks of such 2 requirements (key-values are not refreshed after a configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: LookupableTableSource question

Flavio Pompermaier
I probably messed up with the meaning of eval()..thus it is called once for every distinct key (that could be composed by a combination of fields)?
So, the other question is..how do I enable Blink planner support? 
Since when is LATERAL TABLE available in Flink? Is it equivalent to using temporal tables [1]?


Best,
Flavio

On Sat, Jun 29, 2019 at 3:16 AM JingsongLee <[hidden email]> wrote:
The keys means joint primary keys, it is not list of keys, in your case, maybe there is a single key?

Best, Jingsong Lee


------------------Original Mail ------------------
From:Flavio Pompermaier <[hidden email]>
Date:2019-06-28 22:53:31
Recipient:JingsongLee <[hidden email]>
CC:user <[hidden email]>
Subject:Re: LookupableTableSource question
Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
    for (Object kkk : keys) {
        Row keyRow = Row.of(kkk);
        if (cache != null) {
            List<Row> cachedRows = cache.getIfPresent(keyRow);
            if (cachedRows != null) {
                for (Row cachedRow : cachedRows) {
                    collect(cachedRow);
                }
                return;
            }
        }
    }
 ...

On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier <[hidden email]> wrote:
This could be a good fit, I'll try to dig into it and see if it can be adapted to a REST service.
The only strange thing I see is that the key of the local cache is per block of keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache<Row, List<Row>> cache;

public void eval(Object... keys) {
    Row keyRow = Row.of(keys);
    if (cache != null) {
        List<Row> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (Row cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }   
 ...

while I'd use the following (also for JDBC):

Cache<Row, List<Row>> cache;

public void eval(Object... keys) {
    Row keyRow = Row.of(keys);
    if (cache != null) {
        List<Row> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (Row cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }   
 ...

public void eval(Object... keys) {
    for (Object kkk : keys) {
        Row keyRow = Row.of(kkk);
        if (cache != null) {
            List<Row> cachedRows = cache.getIfPresent(keyRow);
            if (cachedRows != null) {
                for (Row cachedRow : cachedRows) {
                    collect(cachedRow);
                }
                return;
            }
        }
    }
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee <[hidden email]> wrote:
Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. Or use
blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?


 Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年6月28日(星期五) 21:04
To:user <[hidden email]>
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either when a key was not found (a new key has probably been added in the mean time) or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very similar to what I'd like to achieve but I can't find a real-world example using it and it lacks of such 2 requirements (key-values are not refreshed after a configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: LookupableTableSource question

JingsongLee
how do I enable Blink planner support? 
After flink-1.9 release, you can try Blink-planner.

>Since when is LATERAL TABLE available in Flink? Is it equivalent to using temporal tables?
LATERAL TABLE is table function in table, it is available in Flink for a long time.[1]
It is different from temporal table.


Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年7月1日(星期一) 21:26
To:JingsongLee <[hidden email]>
Cc:user <[hidden email]>
Subject:Re: LookupableTableSource question

I probably messed up with the meaning of eval()..thus it is called once for every distinct key (that could be composed by a combination of fields)?
So, the other question is..how do I enable Blink planner support? 
Since when is LATERAL TABLE available in Flink? Is it equivalent to using temporal tables [1]?


Best,
Flavio

On Sat, Jun 29, 2019 at 3:16 AM JingsongLee <[hidden email]> wrote:
The keys means joint primary keys, it is not list of keys, in your case, maybe there is a single key?

Best, Jingsong Lee


------------------Original Mail ------------------
From:Flavio Pompermaier <[hidden email]>
Date:2019-06-28 22:53:31
Recipient:JingsongLee <[hidden email]>
CC:user <[hidden email]>
Subject:Re: LookupableTableSource question
Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
    for (Object kkk : keys) {
        Row keyRow = Row.of(kkk);
        if (cache != null) {
            List<Row> cachedRows = cache.getIfPresent(keyRow);
            if (cachedRows != null) {
                for (Row cachedRow : cachedRows) {
                    collect(cachedRow);
                }
                return;
            }
        }
    }
 ...

On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier <[hidden email]> wrote:
This could be a good fit, I'll try to dig into it and see if it can be adapted to a REST service.
The only strange thing I see is that the key of the local cache is per block of keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache<Row, List<Row>> cache;

public void eval(Object... keys) {
    Row keyRow = Row.of(keys);
    if (cache != null) {
        List<Row> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (Row cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }   
 ...

while I'd use the following (also for JDBC):

Cache<Row, List<Row>> cache;

public void eval(Object... keys) {
    Row keyRow = Row.of(keys);
    if (cache != null) {
        List<Row> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (Row cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }   
 ...

public void eval(Object... keys) {
    for (Object kkk : keys) {
        Row keyRow = Row.of(kkk);
        if (cache != null) {
            List<Row> cachedRows = cache.getIfPresent(keyRow);
            if (cachedRows != null) {
                for (Row cachedRow : cachedRows) {
                    collect(cachedRow);
                }
                return;
            }
        }
    }
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee <[hidden email]> wrote:
Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. Or use
blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?


 Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年6月28日(星期五) 21:04
To:user <[hidden email]>
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either when a key was not found (a new key has probably been added in the mean time) or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very similar to what I'd like to achieve but I can't find a real-world example using it and it lacks of such 2 requirements (key-values are not refreshed after a configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio