Hi folks, We were looking to cache some data using Flink’s MapState in one of our UDFs that are called by Flink SQL queries. I was trying to see if there’s a way to set up these state objects via the basic FunctionContext
[1] we’re provided in the Table / SQL UserDefinedFunction class [2] but from what I can see it’s not possible. We just seem to have access to retrieve the metric group and access to the distributed cache / job params. Is there a way for us in Table / SQL UDFs
to access Flink’s state and store data? Or is this something that isn’t supported / recommended? (If it helps we’re on Flink 1.9 and using the old SQL planner). Our broader use-case is to enrich some data coming in via a Kafka stream by reading additional data in DynamoDB. We’d like to cache this across restarts to cut down on some of the DynamoDb traffic. (Ideally
we’d like to move to temporal tables, but I think that requires a migration to Blink first?) Thanks, -- Piyush |
Hi Piyush,
unfortunately, UDFs have no direct access to Flink's state. Aggregate functions are the only type of functions that can be stateful at the moment. Aggregate functions store their state in an accumulator that is serialized/deserialized on access, but an accumulator field can be backed by a so-called DataView [1] which is directly backed by Flink's state. Maybe it is possible to leverage this functinality. I created an issue to track this problem [2]. But of course this is not on the roadmap so far. Regards, Timo [1] https://github.com/apache/flink/blob/release-1.9/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java [2] https://issues.apache.org/jira/browse/FLINK-19371 On 22.09.20 20:28, Piyush Narang wrote: > Hi folks, > > We were looking to cache some data using Flink’s MapState in one of our > UDFs that are called by Flink SQL queries. I was trying to see if > there’s a way to set up these state objects via the basic > FunctionContext [1] we’re provided in the Table / SQL > UserDefinedFunction class [2] but from what I can see it’s not possible. > We just seem to have access to retrieve the metric group and access to > the distributed cache / job params. Is there a way for us in Table / SQL > UDFs to access Flink’s state and store data? Or is this something that > isn’t supported / recommended? (If it helps we’re on Flink 1.9 and using > the old SQL planner). > > Our broader use-case is to enrich some data coming in via a Kafka stream > by reading additional data in DynamoDB. We’d like to cache this across > restarts to cut down on some of the DynamoDb traffic. (Ideally we’d like > to move to temporal tables, but I think that requires a migration to > Blink first?) > > Thanks, > > [1] - > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/FunctionContext.html > > [2] - > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html > > -- Piyush > |
Hi Timo,
Thanks for getting back and filing the jira. I'll try to see if there's a way we can rework things to take advantage of the aggregate functions. -- Piyush On 9/23/20, 3:55 AM, "Timo Walther" <[hidden email]> wrote: Hi Piyush, unfortunately, UDFs have no direct access to Flink's state. Aggregate functions are the only type of functions that can be stateful at the moment. Aggregate functions store their state in an accumulator that is serialized/deserialized on access, but an accumulator field can be backed by a so-called DataView [1] which is directly backed by Flink's state. Maybe it is possible to leverage this functinality. I created an issue to track this problem [2]. But of course this is not on the roadmap so far. Regards, Timo [1] https://github.com/apache/flink/blob/release-1.9/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java [2] https://issues.apache.org/jira/browse/FLINK-19371 On 22.09.20 20:28, Piyush Narang wrote: > Hi folks, > > We were looking to cache some data using Flink’s MapState in one of our > UDFs that are called by Flink SQL queries. I was trying to see if > there’s a way to set up these state objects via the basic > FunctionContext [1] we’re provided in the Table / SQL > UserDefinedFunction class [2] but from what I can see it’s not possible. > We just seem to have access to retrieve the metric group and access to > the distributed cache / job params. Is there a way for us in Table / SQL > UDFs to access Flink’s state and store data? Or is this something that > isn’t supported / recommended? (If it helps we’re on Flink 1.9 and using > the old SQL planner). > > Our broader use-case is to enrich some data coming in via a Kafka stream > by reading additional data in DynamoDB. We’d like to cache this across > restarts to cut down on some of the DynamoDb traffic. (Ideally we’d like > to move to temporal tables, but I think that requires a migration to > Blink first?) > > Thanks, > > [1] - > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/FunctionContext.html > > [2] - > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html > > -- Piyush > |
Free forum by Nabble | Edit this page |