Using Queryable State within 1 job + docs suggestion

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Using Queryable State within 1 job + docs suggestion

Annemarie Burger
Hi,

I want to use Queryable State to communicate between PU's in the same Flink
job. I'm aware this is not the intended use of Queryable State, but I was
wondering if and how it could be done.
More specifically, I want to query the (event-time) window state of one PU,
from another PU, while both are in the same window. Any suggestions?

Also, when I was trying to get Queryable state working in my IDE, this took
quite a while because the  docs
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#activating-queryable-state>  
state "copy the flink-queryable-state-runtime_2.11-1.10.0.jar from the opt/
folder of your Flink distribution, to the lib/ folder.". This seems to be
only relevant if you're using a local version of Flink. If you're using the
Maven dependencies you can achieve the same effect by adding the
queryable-state-runtime dependency. It could be helpful for other users to
mention this in the docs.

Best,
Annemarie



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using Queryable State within 1 job + docs suggestion

Yun Tang
Hi Annemarie

First of all, I'm afraid Flink does not support to make window state as queryable currently. It was planed to support but haven't implemented as lack of continuous development in this area for Flink community.

Secondly, I think the doc just want to tell user how to enable this feature on server side as Flink job needs to access the queryable-state classes. If you're just running your Flink job locally, add dependency could let your local job access the queryable-state classes which is actually the doc wanted to tell users.

Best
Yun Tang

From: Annemarie Burger <[hidden email]>
Sent: Monday, May 18, 2020 23:13
To: [hidden email] <[hidden email]>
Subject: Using Queryable State within 1 job + docs suggestion
 
Hi,

I want to use Queryable State to communicate between PU's in the same Flink
job. I'm aware this is not the intended use of Queryable State, but I was
wondering if and how it could be done.
More specifically, I want to query the (event-time) window state of one PU,
from another PU, while both are in the same window. Any suggestions?

Also, when I was trying to get Queryable state working in my IDE, this took
quite a while because the  docs
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#activating-queryable-state>  
state "copy the flink-queryable-state-runtime_2.11-1.10.0.jar from the opt/
folder of your Flink distribution, to the lib/ folder.". This seems to be
only relevant if you're using a local version of Flink. If you're using the
Maven dependencies you can achieve the same effect by adding the
queryable-state-runtime dependency. It could be helpful for other users to
mention this in the docs.

Best,
Annemarie



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using Queryable State within 1 job + docs suggestion

Annemarie Burger
Hi,

Thanks for your response!
What if I'm using regular state instead of windowState, is there any way to
use query this state of a PU from another PU in the same Flink job?

Best,
Annemarie



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using Queryable State within 1 job + docs suggestion

Yun Tang
Hi Annemarie

Actually, I do not know what exactly PU means in your thread. If you means the task manager, though I haven't tried I think we might be able to query state in the same job. Maybe you could give a try.

In general, we would initialize two states in the same operator so that they could query each other, which provide better performance.

Best
Yun Tang

From: Annemarie Burger <[hidden email]>
Sent: Thursday, May 21, 2020 19:45
To: [hidden email] <[hidden email]>
Subject: Re: Using Queryable State within 1 job + docs suggestion
 
Hi,

Thanks for your response!
What if I'm using regular state instead of windowState, is there any way to
use query this state of a PU from another PU in the same Flink job?

Best,
Annemarie



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using Queryable State within 1 job + docs suggestion

Annemarie Burger
Hi,

So what I meant was that I have a keyed stream, and from each
thread/keygroup/PU I want to query the state of the other
threads/keygroups/PUs.

Does anybody have any experience with this?

I'm currently working on it, and the main problem seems to be that the
Queryable State Client requires the JobID from which to query the state,
which in my case would be the same as its own jobID. Any ideas how to
workaround this?
Using env.getStreamGraph.getJobGraph.getJobID doesn't seem to work.

Best,
Annemarie



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using Queryable State within 1 job + docs suggestion

Tzu-Li (Gordon) Tai
This in general is not a good idea, as the state you query using queryable
state within a job does not provide any consistency guarantees at all.

Would it be possible to have some trigger that emits state of the windows,
and join the states downstream?
In general, that is a better approach for what you seem to be trying to
achieve.

Otherwise, when it comes to "querying state across operators", that's a hint
where the Stateful Functions [1] model could maybe be a better fit to your
use case here. Specifically, using Stateful Functions, you would model
"querying state" as a request to the target function, with the target
function sending its state back as a response.

Cheers,
Gordon

[1] https://flink.apache.org/stateful-functions.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using Queryable State within 1 job + docs suggestion

Tzu-Li (Gordon) Tai
In reply to this post by Annemarie Burger
Hi,

That in general is not a good idea, with the problem you mentioned as well as the fact that the state you
query within the same job using queryable state does not provide any means of consistency guarantee.

When it comes to "querying state from another operator", it is a hint that your use case can potentially be
better modeled using the Stateful Functions framework [1]. With Stateful Functions, you would model this
as a request message to the target function, with the target function replying a response carrying its state.
There are still other shortcomings though, for example StateFun currently doesn't support windowed state yet.

Cheers,
Gordon

[1] https://flink.apache.org/stateful-functions.html

On Thu, May 21, 2020 at 10:25 PM Annemarie Burger <[hidden email]> wrote:
Hi,

So what I meant was that I have a keyed stream, and from each
thread/keygroup/PU I want to query the state of the other
threads/keygroups/PUs.

Does anybody have any experience with this?

I'm currently working on it, and the main problem seems to be that the
Queryable State Client requires the JobID from which to query the state,
which in my case would be the same as its own jobID. Any ideas how to
workaround this?
Using env.getStreamGraph.getJobGraph.getJobID doesn't seem to work.

Best,
Annemarie



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using Queryable State within 1 job + docs suggestion

m@xi
Hi Gordon,

Yes we are well aware of the inconsistencies that can (and will) emerge
while using queryable state like this. However, we will treat them manually
for ensuring the correctness of our targeting applications.

Therefore, could you help with Annemarie's question or are you aware of
someone that has done this again here in the nabble list to include him to
the discussion?

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using Queryable State within 1 job + docs suggestion

Annemarie Burger
In reply to this post by Tzu-Li (Gordon) Tai
Hi,

I managed to work around the JobID issues, by first starting the task that
queries the state, pauzing it, and then using env.executeAsync.getJobID to
get the proper jobID to use when querying the state, and passing that to the
(pauzed) query state task, which can then continue.

However, the Queryable state CompletableFuture objects always return empty.
Below is the relevant code. Any idea what I'm doing wrong? Any help much
appreciated.


The state is a MapState<GradoopId, Hashmap&lt;GradoopId, TemporalEdge>>.
This represents a edge list of a graph, sorted by source vertex id, and then
by target vertex id.

// The method call to get all edges from another graph partition/thread
which have a certain srcId.
HashMap<GradoopId, TemporalEdge> answer = QS.getSrcVertex(partitionId,
srcId);

     // The method itself. The answer returned is always null, even when the
queried partition's state includes the srcId.
     public HashMap<GradoopId, TemporalEdge> getSrcVertex(Integer
partitionId, GradoopId srcVertex) throws ExecutionException,
InterruptedException {
        CompletableFuture<MapState&lt;GradoopId, HashMap&lt;GradoopId,
TemporalEdge>>> resultFuture =
                client.getKvState(
                        jobID,
                        "queryableState",
                        partitionId,
                        new TypeHint<Integer>(){},
                        descriptor);
        AtomicReference<HashMap&lt;GradoopId, TemporalEdge>> answer = new
AtomicReference<>();
        resultFuture.thenAccept(response -> {

            // These prints are never reached
            try {
                answer.set(response.get(srcVertex));
                System.out.println(response.get(srcVertex));
            } catch (Exception e) {
                System.out.println("We dont have state");
            }
        });
        return answer.get();
    }

// The descriptor used
        descriptor =
                new MapStateDescriptor<GradoopId, HashMap&lt;GradoopId,
TemporalEdge>>(
                        "edgeList",
                        TypeInformation.of(new TypeHint<GradoopId>() {
                        }).createSerializer(new ExecutionConfig()),
                        TypeInformation.of(new
TypeHint<HashMap&lt;GradoopId, TemporalEdge>>() {
                        }).createSerializer(new ExecutionConfig())
                );

// The client
client = new QueryableStateClient("127.0.0.1", 9067);



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/