Using managed keyed state with AsynIo

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Using managed keyed state with AsynIo

KristoffSC
Hi guys,
I'm using Flink 1.9.2

I have a question about uses case where I would like to use FLink's managed
keyed state with Async IO [1]


Lets take as a base line below example taken from [1] and lets assume that
we are executing this on a keyed stream.

final Future<String> result = client.query(key);

CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key,
dbResult)));
        });


Imagine that instead passing key to client.query(..) we will pass some value
taken from Flinks Managed, keyed state. Later the supplier's get method will
return a value that should be stored in that state. In other words, we use
previous results as inputs for next computations.

Is this achievable with Flinks AsyncIo? I can have many pending requests on
client.query which can finished in a random order. The
AsyncDataStream.orderedWait will not help he here since this affects only
the way how Flink "releases" the messages from it's internal queue for Async
operators.


What is more, this scenario can result with multiple concurrent writes/reads
to/from Flink's managed state for same key values. Is this thread safe?


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using managed keyed state with AsynIo

Arvid Heise-3
Hi KristoffSC,

Afaik asyncIO does not support state operations at all because of your mentioned issues (RichAsyncFunction fails if you access state).

I'd probably solve it by having a map or process function before and after the asyncIO for the state operations. If you enable object reuse, performance should be pretty much the same as if async I/O would support it, but the threading model becomes much easier.

So, the pipeline is source -> keyby -> map (retrieve state) -> async IO (use state) -> map (update state). You might need to return Tuple<Key, State> from map and asyncIO to have the full context information on the subsequent operators.

On Mon, Aug 10, 2020 at 4:24 PM KristoffSC <[hidden email]> wrote:
Hi guys,
I'm using Flink 1.9.2

I have a question about uses case where I would like to use FLink's managed
keyed state with Async IO [1]


Lets take as a base line below example taken from [1] and lets assume that
we are executing this on a keyed stream.

final Future<String> result = client.query(key);

CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key,
dbResult)));
        });


Imagine that instead passing key to client.query(..) we will pass some value
taken from Flinks Managed, keyed state. Later the supplier's get method will
return a value that should be stored in that state. In other words, we use
previous results as inputs for next computations.

Is this achievable with Flinks AsyncIo? I can have many pending requests on
client.query which can finished in a random order. The
AsyncDataStream.orderedWait will not help he here since this affects only
the way how Flink "releases" the messages from it's internal queue for Async
operators.


What is more, this scenario can result with multiple concurrent writes/reads
to/from Flink's managed state for same key values. Is this thread safe?


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Using managed keyed state with AsynIo

KristoffSC
Hi Arvid,
thank you for the respond.
Yeah I tried to run my job shortly after posting my message and I got "State
is not supported in rich async function" ;)

I came up with a solution that would solve my initial problem -
concurrent/Async problem of processing messages with the same key but
unfortunately stet is not sported here.

Thank you for the proposition
source -> keyby -> map (retrieve state) -> async IO (use state) -> map
(update state)

However I'm a little bit surprised. I thought that state on a keyed cannot
be shared between operators, and here you are suggesting doing that. Is it
possible then?


Using this occasion I have additional question, Is there any difference from
Flink perspective between this two approaches:

MyProcessFunction pf = new MyProcessFunction(); MyProcessFunction is
stateless object, but it uses Flink keyed state.

Setup 1:

source -> keyBy(key) ->  proces(pf) -> map() -> process(pf) -> sink

Setup 2:
source -> keyBy(key) ->  proces(new MyProcessFunction()) -> map() ->
process(new MyProcessFunction()) -> sink



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using managed keyed state with AsynIo

Arvid Heise-3
Hi KristoffSC,

you are right that state is not shared across operators - I forgot about that. So the approach would only be valid as is if the state can be properly separated into two independent subtasks. For example, you need the state to find the database key and you store the full entry in Flink state afterwards. Then you could fetch the key in the map before async IO and keep the full record in the map after async IO.

Another approach is to perform some kind of feedback from async IO to the first map. There is usually a tradeoff between performance (use a Kafka topic for feedback) and complexity (write some TCP socket magic). I'd rather recommend to have a look at statefun though [1], which implements this feedback in an efficient way and provides a good abstraction for everything that is state-related. Unfortunately, mixing Flink jobs and statefun applications is still not easily possible - I'm assuming it would happen in the next major release. But maybe, you can express everything in statefun, at which point, it's the best choice.

For your question: it shouldn't make any difference, as the function gets serialized in the main() and deserialized at each JM/TM resulting in many copies. The only difference is that in your main(), you have one fewer copy. Since Flink state is only touched in TM, the function instances are different anyways.


On Thu, Aug 13, 2020 at 2:53 PM KristoffSC <[hidden email]> wrote:
Hi Arvid,
thank you for the respond.
Yeah I tried to run my job shortly after posting my message and I got "State
is not supported in rich async function" ;)

I came up with a solution that would solve my initial problem -
concurrent/Async problem of processing messages with the same key but
unfortunately stet is not sported here.

Thank you for the proposition
source -> keyby -> map (retrieve state) -> async IO (use state) -> map
(update state)

However I'm a little bit surprised. I thought that state on a keyed cannot
be shared between operators, and here you are suggesting doing that. Is it
possible then?


Using this occasion I have additional question, Is there any difference from
Flink perspective between this two approaches:

MyProcessFunction pf = new MyProcessFunction(); MyProcessFunction is
stateless object, but it uses Flink keyed state.

Setup 1:

source -> keyBy(key) ->  proces(pf) -> map() -> process(pf) -> sink

Setup 2:
source -> keyBy(key) ->  proces(new MyProcessFunction()) -> map() ->
process(new MyProcessFunction()) -> sink



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Using managed keyed state with AsynIo

KristoffSC
Thanks Arvid,
I like your propositions in my case I wanted to use the state value to
decide if I should do the Async Call to external system. The result of this
call would be a state input. So having this:

Process1(calcualteValue or take it from state) -> AsyncCall to External
system to persist/Validate the value -> Process2(feedback loop Via
meessagibg queue to process1).

Apart from that Process1 would have to consume two streams, which is ok, I
woudl actually have a delay. I wanted to avouid uneceserry calls to External
system by having the cashed/validated value in state.

And this would be done without the delay if I could use State in Async
Operators.


I'm finking bout manufacturing my own Semi Async Operator. My Idea is that I
would have normal KeyedProcessFunction that will wrap list of
SingleThreadExecutors.

In processElement method I will use Key to calculate the index of that Array
to make sure that message for same Key will go to the same ThreadExecutor. I
do want to keep the message order.

I will submit a task like
executor.submit(() -> {
    MyResult result = rservice.process(message, mapState.get(key));
    mapState.put(key, result);
    out.collect(newMessage);
}



Big questions:
1. In my solution  out.collect(newMessage); will be called from few threads
(each will have different message). Is it ThreadSafe?
2. Is using the MapState in multiThreadEnv like I would have here is thread
safe?
Alternativelly I can have associate list of mapStates, one for each
SingleThreadExecutors, so It will be used only by one thread.

With this setup I will not block my Pipeline and I will be able to use
state. I agree that Size of SingleThreadExecutors list will be a limiting
factor.


Is this setup possible with Flink?


Btw I will use RocksDbStateBackend






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using managed keyed state with AsynIo

Arvid Heise-3
Hi Kristoff,

the answer to your big questions is unfortunately no, two times. I see two options in general:

1) process function (as you proposed). On processElement, you'd read the state and invoke your async operation. You enqueue your result in some result queue where you emit it in the next call of processElement. To deal with rare keys, you'd probably also want to use a timer to flush the outputs instead. In the timer/next processElement, you can also access the key state. However, you also need to ensure that these pending results are snapshotted, such that they are not lost on crash. I'd expect that you can mix ProcessFunction and CheckpointedFunction, but haven't done it yet myself.

2) implement your own operator, where you can start by copying or subclassing the existing AsyncWaitOperator [1]. One thing that you need to look out for is to access the state and output collector only in mailbox thread (=main task thread). You can use mailboxExecutor to execute a piece of code in the mailbox.

Even if you go by 1), have a look at the AsyncWaitOperator as it should serve as a good template.


On Fri, Aug 14, 2020 at 12:14 PM KristoffSC <[hidden email]> wrote:
Thanks Arvid,
I like your propositions in my case I wanted to use the state value to
decide if I should do the Async Call to external system. The result of this
call would be a state input. So having this:

Process1(calcualteValue or take it from state) -> AsyncCall to External
system to persist/Validate the value -> Process2(feedback loop Via
meessagibg queue to process1).

Apart from that Process1 would have to consume two streams, which is ok, I
woudl actually have a delay. I wanted to avouid uneceserry calls to External
system by having the cashed/validated value in state.

And this would be done without the delay if I could use State in Async
Operators.


I'm finking bout manufacturing my own Semi Async Operator. My Idea is that I
would have normal KeyedProcessFunction that will wrap list of
SingleThreadExecutors.

In processElement method I will use Key to calculate the index of that Array
to make sure that message for same Key will go to the same ThreadExecutor. I
do want to keep the message order.

I will submit a task like
executor.submit(() -> {
    MyResult result = rservice.process(message, mapState.get(key));
    mapState.put(key, result);
    out.collect(newMessage);
}



Big questions:
1. In my solution  out.collect(newMessage); will be called from few threads
(each will have different message). Is it ThreadSafe?
2. Is using the MapState in multiThreadEnv like I would have here is thread
safe?
Alternativelly I can have associate list of mapStates, one for each
SingleThreadExecutors, so It will be used only by one thread.

With this setup I will not block my Pipeline and I will be able to use
state. I agree that Size of SingleThreadExecutors list will be a limiting
factor.


Is this setup possible with Flink?


Btw I will use RocksDbStateBackend






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng