I am trying to use queryable state, and am encountering issues when querying the state from the client. I get the following exception:
Exception in thread "main" org.apache.flink.runtime.query.UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'word_sums'. at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1532) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:777) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) Exception in thread "main" org.apache.flink.runtime.query.UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'word_sums'. at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1532) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:777) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) In my flow, I am creating the queryable state in the following way: final TypeSerializer<Tuple2<String, Integer>> valueSerializer = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}).createSerializer(new ExecutionConfig()); ValueStateDescriptor<Tuple2<String,Integer>> vsd = new ValueStateDescriptor<>(WORD_SUMS_STATE,valueSerializer); QueryableStateStream<Tuple, Tuple2<String, Integer>> tupleTuple2QueryableStateStream = wordsSummedStream.asQueryableState(WORD_SUMS_STATE, vsd); I am using LocalFlinkMiniCluster and have enabled QueryableStateOptions.SERVER_ENABLE in the configuration. From the logs in the startup of the flow, I see that the queryable state operator is running. I also see the queryable state operation from the web console Is there anything else that I am missing? Thanks, Hayden Marchant |
Hi Hayden,
From what I know, "No KvStateLocation found for KvState instance with name 'word_sums'" is exactly what it means. Your current job can't find the KVState instance. This could result due to a few reasons that I know of: 1. The jobID you supplied for the queryclient job is not equal to the jobID of the state creator job. 2. There is a typo in the name either when you are creating the state or when you are accessing the state. 3. You are connected to a completely different jobmanager and not where the state is. There could be more reasons, but these are the ones on top of myhead. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, As Biplob said this means that the JM cannot find the requested state. The reasons can be one of the above but given that you said you are using the FlinkMiniCluster, I assume you are testing. In this case, it can also be that you start querying your state to soon after the job is submitted, so the state is not yet there as the job that creates it has not yet started. In this case you can retry the query when it fails until it succeeds (assuming that none of the coditions that Biplob mentioned holds). If you have access to the flink code, you can check the tests in the queryablestate IT cases. Kostas On Sep 13, 2017 4:45 PM, "Biplob Biswas" <[hidden email]> wrote: Hi Hayden, |
Free forum by Nabble | Edit this page |