RichFunctions in Flink's Table / SQL API

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

RichFunctions in Flink's Table / SQL API

Piyush Narang

Hi folks,

 

We were looking to cache some data using Flink’s MapState in one of our UDFs that are called by Flink SQL queries. I was trying to see if there’s a way to set up these state objects via the basic FunctionContext [1] we’re provided in the Table / SQL UserDefinedFunction class [2] but from what I can see it’s not possible. We just seem to have access to retrieve the metric group and access to the distributed cache / job params. Is there a way for us in Table / SQL UDFs to access Flink’s state and store data? Or is this something that isn’t supported / recommended? (If it helps we’re on Flink 1.9 and using the old SQL planner).

 

Our broader use-case is to enrich some data coming in via a Kafka stream by reading additional data in DynamoDB. We’d like to cache this across restarts to cut down on some of the DynamoDb traffic. (Ideally we’d like to move to temporal tables, but I think that requires a migration to Blink first?)

 

Thanks,

 

[1] - https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/FunctionContext.html

[2] - https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html

 

-- Piyush

 

Reply | Threaded
Open this post in threaded view
|

Re: RichFunctions in Flink's Table / SQL API

Timo Walther
Hi Piyush,

unfortunately, UDFs have no direct access to Flink's state. Aggregate
functions are the only type of functions that can be stateful at the
moment. Aggregate functions store their state in an accumulator that is
serialized/deserialized on access, but an accumulator field can be
backed by a so-called DataView [1] which is directly backed by Flink's
state. Maybe it is possible to leverage this functinality.

I created an issue to track this problem [2]. But of course this is not
on the roadmap so far.

Regards,
Timo

[1]
https://github.com/apache/flink/blob/release-1.9/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
[2] https://issues.apache.org/jira/browse/FLINK-19371

On 22.09.20 20:28, Piyush Narang wrote:

> Hi folks,
>
> We were looking to cache some data using Flink’s MapState in one of our
> UDFs that are called by Flink SQL queries. I was trying to see if
> there’s a way to set up these state objects via the basic
> FunctionContext [1] we’re provided in the Table / SQL
> UserDefinedFunction class [2] but from what I can see it’s not possible.
> We just seem to have access to retrieve the metric group and access to
> the distributed cache / job params. Is there a way for us in Table / SQL
> UDFs to access Flink’s state and store data? Or is this something that
> isn’t supported / recommended? (If it helps we’re on Flink 1.9 and using
> the old SQL planner).
>
> Our broader use-case is to enrich some data coming in via a Kafka stream
> by reading additional data in DynamoDB. We’d like to cache this across
> restarts to cut down on some of the DynamoDb traffic. (Ideally we’d like
> to move to temporal tables, but I think that requires a migration to
> Blink first?)
>
> Thanks,
>
> [1] -
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/FunctionContext.html
>
> [2] -
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html
>
> -- Piyush
>

Reply | Threaded
Open this post in threaded view
|

Re: RichFunctions in Flink's Table / SQL API

Piyush Narang
Hi Timo,

Thanks for getting back and filing the jira. I'll try to see if there's a way we can rework things to take advantage of the aggregate functions.

-- Piyush
 

On 9/23/20, 3:55 AM, "Timo Walther" <[hidden email]> wrote:

    Hi Piyush,
   
    unfortunately, UDFs have no direct access to Flink's state. Aggregate
    functions are the only type of functions that can be stateful at the
    moment. Aggregate functions store their state in an accumulator that is
    serialized/deserialized on access, but an accumulator field can be
    backed by a so-called DataView [1] which is directly backed by Flink's
    state. Maybe it is possible to leverage this functinality.
   
    I created an issue to track this problem [2]. But of course this is not
    on the roadmap so far.
   
    Regards,
    Timo
   
    [1]
    https://github.com/apache/flink/blob/release-1.9/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
    [2] https://issues.apache.org/jira/browse/FLINK-19371
   
    On 22.09.20 20:28, Piyush Narang wrote:
    > Hi folks,
    >
    > We were looking to cache some data using Flink’s MapState in one of our
    > UDFs that are called by Flink SQL queries. I was trying to see if
    > there’s a way to set up these state objects via the basic
    > FunctionContext [1] we’re provided in the Table / SQL
    > UserDefinedFunction class [2] but from what I can see it’s not possible.
    > We just seem to have access to retrieve the metric group and access to
    > the distributed cache / job params. Is there a way for us in Table / SQL
    > UDFs to access Flink’s state and store data? Or is this something that
    > isn’t supported / recommended? (If it helps we’re on Flink 1.9 and using
    > the old SQL planner).
    >
    > Our broader use-case is to enrich some data coming in via a Kafka stream
    > by reading additional data in DynamoDB. We’d like to cache this across
    > restarts to cut down on some of the DynamoDb traffic. (Ideally we’d like
    > to move to temporal tables, but I think that requires a migration to
    > Blink first?)
    >
    > Thanks,
    >
    > [1] -
    > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/FunctionContext.html
    >
    > [2] -
    > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html
    >
    > -- Piyush
    >