Hi guys,
I'm using Flink 1.9.2 I have a question about uses case where I would like to use FLink's managed keyed state with Async IO [1] Lets take as a base line below example taken from [1] and lets assume that we are executing this on a keyed stream. final Future<String> result = client.query(key); CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return result.get(); } catch (InterruptedException | ExecutionException e) { // Normally handled explicitly. return null; } } }).thenAccept( (String dbResult) -> { resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult))); }); Imagine that instead passing key to client.query(..) we will pass some value taken from Flinks Managed, keyed state. Later the supplier's get method will return a value that should be stored in that state. In other words, we use previous results as inputs for next computations. Is this achievable with Flinks AsyncIo? I can have many pending requests on client.query which can finished in a random order. The AsyncDataStream.orderedWait will not help he here since this affects only the way how Flink "releases" the messages from it's internal queue for Async operators. What is more, this scenario can result with multiple concurrent writes/reads to/from Flink's managed state for same key values. Is this thread safe? [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi KristoffSC, Afaik asyncIO does not support state operations at all because of your mentioned issues (RichAsyncFunction fails if you access state). I'd probably solve it by having a map or process function before and after the asyncIO for the state operations. If you enable object reuse, performance should be pretty much the same as if async I/O would support it, but the threading model becomes much easier. So, the pipeline is source -> keyby -> map (retrieve state) -> async IO (use state) -> map (update state). You might need to return Tuple<Key, State> from map and asyncIO to have the full context information on the subsequent operators. On Mon, Aug 10, 2020 at 4:24 PM KristoffSC <[hidden email]> wrote: Hi guys, -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Arvid,
thank you for the respond. Yeah I tried to run my job shortly after posting my message and I got "State is not supported in rich async function" ;) I came up with a solution that would solve my initial problem - concurrent/Async problem of processing messages with the same key but unfortunately stet is not sported here. Thank you for the proposition source -> keyby -> map (retrieve state) -> async IO (use state) -> map (update state) However I'm a little bit surprised. I thought that state on a keyed cannot be shared between operators, and here you are suggesting doing that. Is it possible then? Using this occasion I have additional question, Is there any difference from Flink perspective between this two approaches: MyProcessFunction pf = new MyProcessFunction(); MyProcessFunction is stateless object, but it uses Flink keyed state. Setup 1: source -> keyBy(key) -> proces(pf) -> map() -> process(pf) -> sink Setup 2: source -> keyBy(key) -> proces(new MyProcessFunction()) -> map() -> process(new MyProcessFunction()) -> sink -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi KristoffSC, you are right that state is not shared across operators - I forgot about that. So the approach would only be valid as is if the state can be properly separated into two independent subtasks. For example, you need the state to find the database key and you store the full entry in Flink state afterwards. Then you could fetch the key in the map before async IO and keep the full record in the map after async IO. Another approach is to perform some kind of feedback from async IO to the first map. There is usually a tradeoff between performance (use a Kafka topic for feedback) and complexity (write some TCP socket magic). I'd rather recommend to have a look at statefun though [1], which implements this feedback in an efficient way and provides a good abstraction for everything that is state-related. Unfortunately, mixing Flink jobs and statefun applications is still not easily possible - I'm assuming it would happen in the next major release. But maybe, you can express everything in statefun, at which point, it's the best choice. For your question: it shouldn't make any difference, as the function gets serialized in the main() and deserialized at each JM/TM resulting in many copies. The only difference is that in your main(), you have one fewer copy. Since Flink state is only touched in TM, the function instances are different anyways. On Thu, Aug 13, 2020 at 2:53 PM KristoffSC <[hidden email]> wrote: Hi Arvid, -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Thanks Arvid,
I like your propositions in my case I wanted to use the state value to decide if I should do the Async Call to external system. The result of this call would be a state input. So having this: Process1(calcualteValue or take it from state) -> AsyncCall to External system to persist/Validate the value -> Process2(feedback loop Via meessagibg queue to process1). Apart from that Process1 would have to consume two streams, which is ok, I woudl actually have a delay. I wanted to avouid uneceserry calls to External system by having the cashed/validated value in state. And this would be done without the delay if I could use State in Async Operators. I'm finking bout manufacturing my own Semi Async Operator. My Idea is that I would have normal KeyedProcessFunction that will wrap list of SingleThreadExecutors. In processElement method I will use Key to calculate the index of that Array to make sure that message for same Key will go to the same ThreadExecutor. I do want to keep the message order. I will submit a task like executor.submit(() -> { MyResult result = rservice.process(message, mapState.get(key)); mapState.put(key, result); out.collect(newMessage); } Big questions: 1. In my solution out.collect(newMessage); will be called from few threads (each will have different message). Is it ThreadSafe? 2. Is using the MapState in multiThreadEnv like I would have here is thread safe? Alternativelly I can have associate list of mapStates, one for each SingleThreadExecutors, so It will be used only by one thread. With this setup I will not block my Pipeline and I will be able to use state. I agree that Size of SingleThreadExecutors list will be a limiting factor. Is this setup possible with Flink? Btw I will use RocksDbStateBackend -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kristoff, the answer to your big questions is unfortunately no, two times. I see two options in general: 1) process function (as you proposed). On processElement, you'd read the state and invoke your async operation. You enqueue your result in some result queue where you emit it in the next call of processElement. To deal with rare keys, you'd probably also want to use a timer to flush the outputs instead. In the timer/next processElement, you can also access the key state. However, you also need to ensure that these pending results are snapshotted, such that they are not lost on crash. I'd expect that you can mix ProcessFunction and CheckpointedFunction, but haven't done it yet myself. 2) implement your own operator, where you can start by copying or subclassing the existing AsyncWaitOperator [1]. One thing that you need to look out for is to access the state and output collector only in mailbox thread (=main task thread). You can use mailboxExecutor to execute a piece of code in the mailbox. Even if you go by 1), have a look at the AsyncWaitOperator as it should serve as a good template. On Fri, Aug 14, 2020 at 12:14 PM KristoffSC <[hidden email]> wrote: Thanks Arvid, -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |