Hi, Following on from an earlier email my approach has changed but I am still unsure how to best acheive my goal. I have records coming through a kinesis stream into flink: { id: <string> var1: <string> ... } 'id' needs to be replaced with a value from a DB store, or if not present in the DB generate in flink a new ID, cache the value and then store back in the db. This is essentially a basic ID mapping service. Currently for each record I use asyncIO to get a value from Dynamo or generate and write the new value back to the DB. This is unnecissary as I should be able to cache this value after the first time it is seen/generated. What I want to do is cache the value from the DB after first fetch in some form of local state but also update the DB. My confusion is over which of the API's or what I should use to do this? Currently my code looks something like: source = KeyedStream<Pojo, Object> getKinesisSource().keyBy(pojo - pojo.id) SingleOutputStreamOperator<Pojo2> ps = AsycDataStream.unorderedWait(source, new DynoProcessingCode(), .. ..).process(new processFunction()) class processFunction extends ProcessFunction<Pojo, O> { .. } If I insert a KeyedProcessFunction after the keyby and before the asyncIO I could abort the Async process if the ID has already been read from the cache, but if I do need to fetch from the db, how do I store that in the keyed cache in the Async IO process? It seems that maybe that is not possible and I should use Operator State? Any help appreciated. Thanks, O Sent with ProtonMail Secure Email. |
Hi Orionemail, There is no simple state access in asyncIO operator. I think this would require a custom caching solution. Maybe, other community users solved this problem in some other way. Best, Andrey On Mon, Jun 8, 2020 at 5:33 PM orionemail <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |