Hi,
I want to use Queryable State to communicate between PU's in the same Flink job. I'm aware this is not the intended use of Queryable State, but I was wondering if and how it could be done. More specifically, I want to query the (event-time) window state of one PU, from another PU, while both are in the same window. Any suggestions? Also, when I was trying to get Queryable state working in my IDE, this took quite a while because the docs <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#activating-queryable-state> state "copy the flink-queryable-state-runtime_2.11-1.10.0.jar from the opt/ folder of your Flink distribution, to the lib/ folder.". This seems to be only relevant if you're using a local version of Flink. If you're using the Maven dependencies you can achieve the same effect by adding the queryable-state-runtime dependency. It could be helpful for other users to mention this in the docs. Best, Annemarie -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Annemarie
First of all, I'm afraid Flink does not support to make window state as queryable currently. It was planed to support but haven't implemented as lack of continuous development in this area for Flink community.
Secondly, I think the doc just want to tell user how to enable this feature on server side as Flink job needs to access the queryable-state classes. If you're just running your Flink job locally, add dependency could let your local job access the queryable-state
classes which is actually the doc wanted to tell users.
Best
Yun Tang
From: Annemarie Burger <[hidden email]>
Sent: Monday, May 18, 2020 23:13 To: [hidden email] <[hidden email]> Subject: Using Queryable State within 1 job + docs suggestion Hi,
I want to use Queryable State to communicate between PU's in the same Flink job. I'm aware this is not the intended use of Queryable State, but I was wondering if and how it could be done. More specifically, I want to query the (event-time) window state of one PU, from another PU, while both are in the same window. Any suggestions? Also, when I was trying to get Queryable state working in my IDE, this took quite a while because the docs <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#activating-queryable-state> state "copy the flink-queryable-state-runtime_2.11-1.10.0.jar from the opt/ folder of your Flink distribution, to the lib/ folder.". This seems to be only relevant if you're using a local version of Flink. If you're using the Maven dependencies you can achieve the same effect by adding the queryable-state-runtime dependency. It could be helpful for other users to mention this in the docs. Best, Annemarie -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Thanks for your response! What if I'm using regular state instead of windowState, is there any way to use query this state of a PU from another PU in the same Flink job? Best, Annemarie -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Annemarie
Actually, I do not know what exactly PU means in your thread. If you means the task manager, though I haven't tried I think we might be able to query state in the same job. Maybe you could give a try.
In general, we would initialize two states in the same operator so that they could query each other, which provide better performance.
Best
Yun Tang
From: Annemarie Burger <[hidden email]>
Sent: Thursday, May 21, 2020 19:45 To: [hidden email] <[hidden email]> Subject: Re: Using Queryable State within 1 job + docs suggestion Hi,
Thanks for your response! What if I'm using regular state instead of windowState, is there any way to use query this state of a PU from another PU in the same Flink job? Best, Annemarie -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
So what I meant was that I have a keyed stream, and from each thread/keygroup/PU I want to query the state of the other threads/keygroups/PUs. Does anybody have any experience with this? I'm currently working on it, and the main problem seems to be that the Queryable State Client requires the JobID from which to query the state, which in my case would be the same as its own jobID. Any ideas how to workaround this? Using env.getStreamGraph.getJobGraph.getJobID doesn't seem to work. Best, Annemarie -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
This in general is not a good idea, as the state you query using queryable
state within a job does not provide any consistency guarantees at all. Would it be possible to have some trigger that emits state of the windows, and join the states downstream? In general, that is a better approach for what you seem to be trying to achieve. Otherwise, when it comes to "querying state across operators", that's a hint where the Stateful Functions [1] model could maybe be a better fit to your use case here. Specifically, using Stateful Functions, you would model "querying state" as a request to the target function, with the target function sending its state back as a response. Cheers, Gordon [1] https://flink.apache.org/stateful-functions.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Annemarie Burger
Hi, That in general is not a good idea, with the problem you mentioned as well as the fact that the state you query within the same job using queryable state does not provide any means of consistency guarantee. When it comes to "querying state from another operator", it is a hint that your use case can potentially be better modeled using the Stateful Functions framework [1]. With Stateful Functions, you would model this as a request message to the target function, with the target function replying a response carrying its state. There are still other shortcomings though, for example StateFun currently doesn't support windowed state yet. Cheers, Gordon [1] https://flink.apache.org/stateful-functions.html On Thu, May 21, 2020 at 10:25 PM Annemarie Burger <[hidden email]> wrote: Hi, |
Hi Gordon,
Yes we are well aware of the inconsistencies that can (and will) emerge while using queryable state like this. However, we will treat them manually for ensuring the correctness of our targeting applications. Therefore, could you help with Annemarie's question or are you aware of someone that has done this again here in the nabble list to include him to the discussion? Thanks in advance. Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Tzu-Li (Gordon) Tai
Hi,
I managed to work around the JobID issues, by first starting the task that queries the state, pauzing it, and then using env.executeAsync.getJobID to get the proper jobID to use when querying the state, and passing that to the (pauzed) query state task, which can then continue. However, the Queryable state CompletableFuture objects always return empty. Below is the relevant code. Any idea what I'm doing wrong? Any help much appreciated. The state is a MapState<GradoopId, Hashmap<GradoopId, TemporalEdge>>. This represents a edge list of a graph, sorted by source vertex id, and then by target vertex id. // The method call to get all edges from another graph partition/thread which have a certain srcId. HashMap<GradoopId, TemporalEdge> answer = QS.getSrcVertex(partitionId, srcId); // The method itself. The answer returned is always null, even when the queried partition's state includes the srcId. public HashMap<GradoopId, TemporalEdge> getSrcVertex(Integer partitionId, GradoopId srcVertex) throws ExecutionException, InterruptedException { CompletableFuture<MapState<GradoopId, HashMap<GradoopId, TemporalEdge>>> resultFuture = client.getKvState( jobID, "queryableState", partitionId, new TypeHint<Integer>(){}, descriptor); AtomicReference<HashMap<GradoopId, TemporalEdge>> answer = new AtomicReference<>(); resultFuture.thenAccept(response -> { // These prints are never reached try { answer.set(response.get(srcVertex)); System.out.println(response.get(srcVertex)); } catch (Exception e) { System.out.println("We dont have state"); } }); return answer.get(); } // The descriptor used descriptor = new MapStateDescriptor<GradoopId, HashMap<GradoopId, TemporalEdge>>( "edgeList", TypeInformation.of(new TypeHint<GradoopId>() { }).createSerializer(new ExecutionConfig()), TypeInformation.of(new TypeHint<HashMap<GradoopId, TemporalEdge>>() { }).createSerializer(new ExecutionConfig()) ); // The client client = new QueryableStateClient("127.0.0.1", 9067); -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |