Internal state and external stores conditional usage advice sought (dynamodb asyncIO)

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Internal state and external stores conditional usage advice sought (dynamodb asyncIO)

orionemail
Hi,

Following on from an earlier email my approach has changed but I am still unsure how to best acheive my goal.

I have records coming through a kinesis stream into flink:

{ id: <string>
  var1: <string>
  ...
}

'id' needs to be replaced with a value from a DB store, or if not present in the DB generate in flink a new ID, cache the value and then store back in the db.  This is essentially a basic ID mapping service.

Currently for each record I use asyncIO to get a value from Dynamo or generate and write the new value back to the DB.

This is unnecissary as I should be able to cache this value after the first time it is seen/generated.

What I want to do is cache the value from the DB after first fetch in some form of local state  but also update the DB.

My confusion is over which of the API's or what I should use to do this?

Currently my code looks something like:

source = KeyedStream<Pojo, Object> getKinesisSource().keyBy(pojo - pojo.id)

SingleOutputStreamOperator<Pojo2> ps = AsycDataStream.unorderedWait(source,
new DynoProcessingCode(),
..
..).process(new processFunction())

class processFunction extends ProcessFunction<Pojo, O> {
    ..
}

If I insert a KeyedProcessFunction after the keyby and before the asyncIO I could abort the Async process if the ID has already been read from the cache, but if I do need to fetch from the db, how do I store that in the keyed cache in the Async IO process?  It seems that maybe that is not possible and I should use Operator State?

Any help appreciated.

Thanks,

O


Sent with ProtonMail Secure Email.

Reply | Threaded
Open this post in threaded view
|

Re: Internal state and external stores conditional usage advice sought (dynamodb asyncIO)

Andrey Zagrebin-5
Hi Orionemail,

There is no simple state access in asyncIO operator. I think this would require a custom caching solution.
Maybe, other community users solved this problem in some other way.

Best,
Andrey


On Mon, Jun 8, 2020 at 5:33 PM orionemail <[hidden email]> wrote:
Hi,

Following on from an earlier email my approach has changed but I am still unsure how to best acheive my goal.

I have records coming through a kinesis stream into flink:

{ id: <string>
  var1: <string>
  ...
}

'id' needs to be replaced with a value from a DB store, or if not present in the DB generate in flink a new ID, cache the value and then store back in the db.  This is essentially a basic ID mapping service.

Currently for each record I use asyncIO to get a value from Dynamo or generate and write the new value back to the DB.

This is unnecissary as I should be able to cache this value after the first time it is seen/generated.

What I want to do is cache the value from the DB after first fetch in some form of local state  but also update the DB.

My confusion is over which of the API's or what I should use to do this?

Currently my code looks something like:

source = KeyedStream<Pojo, Object> getKinesisSource().keyBy(pojo - pojo.id)

SingleOutputStreamOperator<Pojo2> ps = AsycDataStream.unorderedWait(source,
new DynoProcessingCode(),
..
..).process(new processFunction())

class processFunction extends ProcessFunction<Pojo, O> {
    ..
}

If I insert a KeyedProcessFunction after the keyby and before the asyncIO I could abort the Async process if the ID has already been read from the cache, but if I do need to fetch from the db, how do I store that in the keyed cache in the Async IO process?  It seems that maybe that is not possible and I should use Operator State?

Any help appreciated.

Thanks,

O


Sent with ProtonMail Secure Email.