Queryable state in a keyed stream not querying properly

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Queryable state in a keyed stream not querying properly

Philip Doctor

Dear Flink Users,

I’m getting started with Flink and I’ve bumped into a small problem.  I have a keyed stream like this:

 

val stream = env.addSource(consumer)
  .flatMap(new ValidationMap()).name("ValidationMap")
  .keyBy(x => (x.getObj.foo(), x.getObj.bar(), x.getObj.baz()))
  .flatMap(new Calculator(this.config.size, this.config.queryableStateName)).name(jobname)

 

 

Within my stream I have a ValueState that I use to maintain a list.

 

I then use the QueryableStateClient to

client.getKvState(flinkJobID, stateName, serializedKey.hashCode(), serializedKey);

 

Where the “serializedKey” matches the .keyBy on the keyed stream.

 

When I query the state things go wrong.  I’ve determined that the JobManager appears to send my query to one of the three TaskManagers I have running, so about 1/3 of the time I get the proper result and the other 2/3 of the time I get

 

org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace: KvState does not hold any state for key/namespace.

 

I feel like I must have somehow misconfigured my job, how can I instruct the job manager to properly query the TaskManager that has my data?  Is there a specific setting or configuration I’m missing?

 

Thank you for your time.

 

-Phil

Reply | Threaded
Open this post in threaded view
|

Re: Queryable state in a keyed stream not querying properly

Aljoscha Krettek
Hi Philip,

The JobManager should figure out the correct TaskManager for your query based on the key. You mentioned that you get the result 1/3 of the time, is this 1/3 of the time for queries with exactly the same key or for queries with different keys? 

Also, could it be that the state you’re trying to access is simply not there (yet)?

It might very well be that you discovered a bug, the queryable state feature hasn’t been touched for a while but we’re planning to work on that for the 1.4 release cycle.

Best,
Aljoscha
On 19. May 2017, at 04:29, Philip Doctor <[hidden email]> wrote:

Dear Flink Users,
I’m getting started with Flink and I’ve bumped into a small problem.  I have a keyed stream like this:
 
val stream = env.addSource(consumer)
  .flatMap(new ValidationMap()).name("ValidationMap")
  .keyBy(x => (x.getObj.foo(), x.getObj.bar(), x.getObj.baz()))
  .flatMap(new Calculator(this.config.size, this.config.queryableStateName)).name(jobname)
 
 
Within my stream I have a ValueState that I use to maintain a list.
 
I then use the QueryableStateClient to
client.getKvState(flinkJobID, stateName, serializedKey.hashCode(), serializedKey);
 
Where the “serializedKey” matches the .keyBy on the keyed stream.
 
When I query the state things go wrong.  I’ve determined that the JobManager appears to send my query to one of the three TaskManagers I have running, so about 1/3 of the time I get the proper result and the other 2/3 of the time I get 
 
org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace: KvState does not hold any state for key/namespace.
 
I feel like I must have somehow misconfigured my job, how can I instruct the job manager to properly query the TaskManager that has my data?  Is there a specific setting or configuration I’m missing?
 
Thank you for your time.
 
-Phil