Hi All,
I have couple of questions regarding state maintenance in flink. - I have a connected stream and then a keyby operator followed by a flatmap function. I use MapState and keys get added by data from stream1 and removed by messges from stream2. Stream2 acts as a control stream in my pipeline. My question is when the keys are removed will the state in rocksdb also be removed? How does rocks db get the most recent state? - Can I use guava cache in MapState like MapState<String, Cache<String, String>>? Do I have to write a serializer to persist data from guava cache? - One of my downstream operator requires keyed state because I need to query the state value but it also has two huge state values that are basically the same across all parallel operator instances. Initially I used operator state and checkpoint only in the 0th index of operator and other instances would not checkpoint the same data. How can I achieve this in Keyed State? Each operator will have around 10GB of same data. Not sure if this will be a problem in future. Thanks, Navneeth |
Hi Navneeth,
Answering your three questions separately: 1. Yes. Your MapState will be backed by RocksDB, so when removing an entry from the map state, the state will be removed from the local RocksDB as well. 2. If state classes are not POJOs, they will be serialized by Kryo, unless a custom serializer is specifically specified otherwise. You can take a look at this document on how to do that [1]. 3. I might need to know more information to be able to suggest properly for this one. How are you using the "huge state values"? From what you described, it seems like you only need it on one of the parallel instances, so I'm a bit curious on what they are actually used for. Are they needed when processing your records? Cheers, Gordon [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#custom-serialization-for-managed-state -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks Gordon for your response. I have around 80 parallel flatmap operator instances and each instance requires 3 states. Out of which one is user state in which each operator will have unique user's data and I need this data to be queryable. The other two states are kind of static states which are only modified when there an update message in config stream. This static data could easily be around 2GB and in my previous approach I used operator state where the data is retrieved inside open method across all operator instances whereas checkpointed only inside one of the operator instance. One of the issue that I have is if I change the operator parallelism how would it affect the internal state? On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote: Hi Navneeth, |
Hi Navneeth, there's a lower level state interface that should address your requirements: OperatorStateStore.getUnionListState()The OperatorStateStore is a bit hidden. You have to implement the CheckpointedFunction interface. When CheckpointedFunction.initializeState(FunctionInitializationContext context) is called context has a method getOperatorStateStore(). I'd recommend to have a look at the detailed JavaDocs of all involved classes and methods. 2017-09-05 19:35 GMT+02:00 Navneeth Krishnan <[hidden email]>:
|
Will I be able to use both queryable MapState and union list state while implementing the CheckpointedFunction interface? Because one of my major requirement on that operator is to provide a queryable state and in order to compute that state we need the common static state across all parallel operator instances. Thanks. On Thu, Sep 7, 2017 at 12:44 AM, Fabian Hueske <[hidden email]> wrote:
|
Only KeyedState can be used as queryable state. So you cannot query the OperatorState. AFAIK, it should not be a problem if an operator has OperatorState and queryable KeyedState. 2017-09-07 17:01 GMT+02:00 Navneeth Krishnan <[hidden email]>:
|
Free forum by Nabble | Edit this page |