Queryable State unavailable after Kubernetes HA State cleanup

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Queryable State unavailable after Kubernetes HA State cleanup

Sandeep khanzode
Hello,

Stuck at this time. Any help will be appreciated.


I am able to create a queryable state and also query the state. Everything works correctly.
KeyedStream<Tuple2<Key, VO>, Key> stream = sourceStream.keyBy(t2 -> t2.f0);
stream.asQueryableState("queryableVO");


I deploy this on a Kubernetes cluster with Flink standalone-job and KubernetesHAFactory. 

There are two states created. One is the operator and keyed state which is stored in a RocksDB Backend in S3.

The other is the HA state maintained by Kubernetes in S3.

If anything changes in the job main class (like removing operators etc.), the upgrade does not work seamlessly and I have to delete the HA state from S3.

If I delete the S3 state for HA, the queryable state becomes unusable i.e. I cannot query from the state anymore. Interestingly, the other operator and keyed states in RocksDB backend are still accessible! Just not the queryable state.

When I check the UI, I see the checkpointed state for the queryable stream has a data size of approx ~50-60KB. But I still cannot query it.


Thanks,
Sandeep

Reply | Threaded
Open this post in threaded view
|

Re: Queryable State unavailable after Kubernetes HA State cleanup

Till Rohrmann
Hi Sandeep,

I don't fully understand the problematic scenario yet. What exactly is the HA state maintained by Kubernetes in S3? 

Queryable state works by asking for the current state of an operator. If you use asQueryableState, then you create a reducing state which appends all stream elements. This should then be stored in the configured state backend (in your case probably RocksDB). For checkpoints, this state is stored periodically on S3.

How is the query operation failing? Did you check the cluster logs whether they contain some suspicious things? 

Cheers,
Till

On Wed, Apr 28, 2021 at 5:26 PM Sandeep khanzode <[hidden email]> wrote:
Hello,

Stuck at this time. Any help will be appreciated.


I am able to create a queryable state and also query the state. Everything works correctly.
KeyedStream<Tuple2<Key, VO>, Key> stream = sourceStream.keyBy(t2 -> t2.f0);
stream.asQueryableState("queryableVO");


I deploy this on a Kubernetes cluster with Flink standalone-job and KubernetesHAFactory. 

There are two states created. One is the operator and keyed state which is stored in a RocksDB Backend in S3.

The other is the HA state maintained by Kubernetes in S3.

If anything changes in the job main class (like removing operators etc.), the upgrade does not work seamlessly and I have to delete the HA state from S3.

If I delete the S3 state for HA, the queryable state becomes unusable i.e. I cannot query from the state anymore. Interestingly, the other operator and keyed states in RocksDB backend are still accessible! Just not the queryable state.

When I check the UI, I see the checkpointed state for the queryable stream has a data size of approx ~50-60KB. But I still cannot query it.


Thanks,
Sandeep