Hello,
Is it possible to have managed operator state where all the parallel operators know that they have the same state? Therefore, it would be only snapshotted by one of them and in case of failure (or rescaling) all would use that snapshot. For the rescaling case I guess I could use union redistribution and just keep one of the elements of the list, but given the size of the state it would be more efficient if this can be avoided. Thanks, Gerard |
Hi Gerard, such a type of state is not yet supported, but on the roadmap. The feature is called Broadcast State. A description is in JIRA [1]. 2017-07-03 17:50 GMT+02:00 gerardg <[hidden email]>: Hello, |
Hi,
I think you can do this already, please take a look at OperatorStateStore::getUnionListState(…). You could add state to this only one one parallel operator instance, e.g. by checking the subtask index. Best, Stefan
|
Thanks Fabian, I'll keep an eye to that JIRA.
I'm not sure I follow you Stefan. You mean that I could implement my own OperatorStateStore and override its methods (e.g. snapshot and restore) to achieve this functionality? I think I don't have enough knowledge about Flink's internals to implement this easily. Gerard |
What I mean is that you could obtain such a state in
initializeState(FunctionInitializationContext context) { context.getOperatorStateStore().getUnionListState(…); } and in snapshotState(…), you will just insert the state in only one of the parallel instances. Which instance can be based on the subtask index (e.g. only add to the list state if your subtask index is == 0). You can obtain the subtask index by getRuntimeContext().getIndexOfThisSubtask(). UnionListState will replicate all submitted states to all parallel instances, so what was checkpointed on one operator instance will be replicated to all in restore. Best, Stefan
|
Hi, +1 to what Stefan is suggesting , we have been using similar logic for a while: @Override public void snapshotState(StateSnapshotContext context) throws Exception { updateBroadcastState(); super.snapshotState(context); } @Override public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context); initBroadcastState(); } Gyula Stefan Richter <[hidden email]> ezt írta (időpont: 2017. júl. 4., K, 15:19):
|
Hello Gyula & Stefan,
Below I attach a similar situation that I am trying to resolve, [1] I am also using *managed operator state*, but I have some trouble with the flink documentation. I believe it is not that clear. So, I have the following questions: 1 -- Can I concatenate all the partial states of all the parallel instances of a .process() operator for example? Ans : I believe YES. I should use unionListState(), but I do not know how. 2 -- Can I a perform the merge of all the state in one parallel instance and then forward the state through a Side Stream inside the the .processElement() function?? 3 -- If (1) and (2) have affirmative answers, could you elaborate further of how this is done? Best, Max [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintain-heavy-hitters-in-Flink-application-td16965.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |