Fwd: Error querying flink state

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Error querying flink state

Falak Kansal

Hi,

I have set up a flink cluster on my local machine. I created a flink job (TrackMaximumTemperature) and made the state queryable. I am using github/streamingwithflink/chapter7/QueryableState.scala example from https://github.com/streaming-with-flink repository. Please find the file attached.

Now i have the running job id and when i go and try to access the state, it throws an exception. I see the job is running and I am using the correct jobId. Also checkpointing is enabled in the original job and i have set the properties related to checkpointing in flink-conf.yaml. Am I missing something? Any leads will be appreciated. Thank you :)


Exception stack trace:
Caused by: org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not retrieve location of state=maxTemperature of job=9a528bf3e1b650aed7e0b1e26d038ad5. Potential reasons are: i) the state is not ready, or ii) the job does not exist.
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)


QueryableState.txt (5K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Error querying flink state

Till Rohrmann
Hi Falak,

Which version of Flink are you using? Providing us with the debug logs could also help understanding what's going wrong.

I guess that you have copied the flink-queryable-state-runtime jar into the lib directory and set queryable-state.enable: true in the configuration, right? Here is the link to the documentation for queryable state [1] for more details.


Cheers,
Till

On Thu, Jan 14, 2021 at 1:18 PM Falak Kansal <[hidden email]> wrote:

Hi,

I have set up a flink cluster on my local machine. I created a flink job (TrackMaximumTemperature) and made the state queryable. I am using github/streamingwithflink/chapter7/QueryableState.scala example from https://github.com/streaming-with-flink repository. Please find the file attached.

Now i have the running job id and when i go and try to access the state, it throws an exception. I see the job is running and I am using the correct jobId. Also checkpointing is enabled in the original job and i have set the properties related to checkpointing in flink-conf.yaml. Am I missing something? Any leads will be appreciated. Thank you :)


Exception stack trace:
Caused by: org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not retrieve location of state=maxTemperature of job=9a528bf3e1b650aed7e0b1e26d038ad5. Potential reasons are: i) the state is not ready, or ii) the job does not exist.
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

Reply | Threaded
Open this post in threaded view
|

Re: Error querying flink state

Till Rohrmann
Hi Falak,

it is hard to tell what is going wrong w/o the debug logs. Could you check whether they contain anything specific? You can also share them with us.

Cheers,
Till

On Wed, Jan 20, 2021 at 1:04 PM Falak Kansal <[hidden email]> wrote:
Hi, 

Thank you so much for the response. I am using the 1.12 version and after configurational changes I am able to query the state. 

Although what issue I am facing is, I am able to query the state of the first submitted job only. Later on if i query the state of a different job i see the same exception. I made sure, I am using a different state name for the next submitted job and I am using the correct jobId in the query.


Thank you
Falak

On Mon, Jan 18, 2021 at 11:28 PM Till Rohrmann <[hidden email]> wrote:
Hi Falak,

Which version of Flink are you using? Providing us with the debug logs could also help understanding what's going wrong.

I guess that you have copied the flink-queryable-state-runtime jar into the lib directory and set queryable-state.enable: true in the configuration, right? Here is the link to the documentation for queryable state [1] for more details.


Cheers,
Till

On Thu, Jan 14, 2021 at 1:18 PM Falak Kansal <[hidden email]> wrote:

Hi,

I have set up a flink cluster on my local machine. I created a flink job (TrackMaximumTemperature) and made the state queryable. I am using github/streamingwithflink/chapter7/QueryableState.scala example from https://github.com/streaming-with-flink repository. Please find the file attached.

Now i have the running job id and when i go and try to access the state, it throws an exception. I see the job is running and I am using the correct jobId. Also checkpointing is enabled in the original job and i have set the properties related to checkpointing in flink-conf.yaml. Am I missing something? Any leads will be appreciated. Thank you :)


Exception stack trace:
Caused by: org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not retrieve location of state=maxTemperature of job=9a528bf3e1b650aed7e0b1e26d038ad5. Potential reasons are: i) the state is not ready, or ii) the job does not exist.
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)