Managed operator state treating state of all parallel operators as the same

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Managed operator state treating state of all parallel operators as the same

gerardg
Hello,

Is it possible to have managed operator state where all the parallel operators know that they have the same state? Therefore, it would be only snapshotted by one of them and in case of failure (or rescaling) all would use that snapshot.

For the rescaling case I guess I could use union redistribution and just keep one of the elements of the list, but given the size of the state it would be more efficient if this can be avoided.

Thanks,

Gerard
Reply | Threaded
Open this post in threaded view
|

Re: Managed operator state treating state of all parallel operators as the same

Fabian Hueske-2
Hi Gerard,

such a type of state is not yet supported, but on the roadmap.
The feature is called Broadcast State. A description is in JIRA [1].

Best, Fabian

2017-07-03 17:50 GMT+02:00 gerardg <[hidden email]>:
Hello,

Is it possible to have managed operator state where all the parallel
operators know that they have the same state? Therefore, it would be only
snapshotted by one of them and in case of failure (or rescaling) all would
use that snapshot.

For the rescaling case I guess I could use union redistribution and just
keep one of the elements of the list, but given the size of the state it
would be more efficient if this can be avoided.

Thanks,

Gerard



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-operator-state-treating-state-of-all-parallel-operators-as-the-same-tp14102.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Managed operator state treating state of all parallel operators as the same

Stefan Richter
Hi,

I think you can do this already, please take a look at OperatorStateStore::getUnionListState(…). You could add state to this only one one parallel operator instance, e.g. by checking the subtask index.

Best,
Stefan

Am 03.07.2017 um 18:14 schrieb Fabian Hueske <[hidden email]>:

Hi Gerard,

such a type of state is not yet supported, but on the roadmap.
The feature is called Broadcast State. A description is in JIRA [1].

Best, Fabian

2017-07-03 17:50 GMT+02:00 gerardg <[hidden email]>:
Hello,

Is it possible to have managed operator state where all the parallel
operators know that they have the same state? Therefore, it would be only
snapshotted by one of them and in case of failure (or rescaling) all would
use that snapshot.

For the rescaling case I guess I could use union redistribution and just
keep one of the elements of the list, but given the size of the state it
would be more efficient if this can be avoided.

Thanks,

Gerard



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-operator-state-treating-state-of-all-parallel-operators-as-the-same-tp14102.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Managed operator state treating state of all parallel operators as the same

gerardg
Thanks Fabian, I'll keep an eye to that JIRA.

I'm not sure I follow you Stefan. You mean that I could implement my own OperatorStateStore and override its methods (e.g. snapshot and restore) to achieve this functionality? I think I don't have enough knowledge about Flink's internals to implement this easily.

Gerard
Reply | Threaded
Open this post in threaded view
|

Re: Managed operator state treating state of all parallel operators as the same

Stefan Richter
What I mean is that you could obtain such a state in 

initializeState(FunctionInitializationContext context) {
context.getOperatorStateStore().getUnionListState(…);
}

and in snapshotState(…), you will just insert the state in only one of the parallel instances. Which instance can be based on the subtask index (e.g. only add to the list state if your subtask index is == 0). You can obtain the subtask index by getRuntimeContext().getIndexOfThisSubtask().

UnionListState will replicate all submitted states to all parallel instances, so what was checkpointed on one operator instance will be replicated to all in restore.

Best,
Stefan

Am 04.07.2017 um 13:56 schrieb gerardg <[hidden email]>:

Thanks Fabian, I'll keep an eye to that JIRA.

I'm not sure I follow you Stefan. You mean that I could implement my own
OperatorStateStore and override its methods (e.g. snapshot and restore) to
achieve this functionality? I think I don't have enough knowledge about
Flink's internals to implement this easily.

Gerard



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-operator-state-treating-state-of-all-parallel-operators-as-the-same-tp14102p14111.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Managed operator state treating state of all parallel operators as the same

Gyula Fóra
Hi,

+1 to what Stefan is suggesting , we have been using similar logic for a while:

@Override

public void snapshotState(StateSnapshotContext context) throws Exception {

    updateBroadcastState();

    super.snapshotState(context);

}


@Override

public void initializeState(StateInitializationContext context) throws Exception {

    super.initializeState(context);

    initBroadcastState();

}


Gyula


Stefan Richter <[hidden email]> ezt írta (időpont: 2017. júl. 4., K, 15:19):
What I mean is that you could obtain such a state in 

initializeState(FunctionInitializationContext context) {
context.getOperatorStateStore().getUnionListState(…);
}

and in snapshotState(…), you will just insert the state in only one of the parallel instances. Which instance can be based on the subtask index (e.g. only add to the list state if your subtask index is == 0). You can obtain the subtask index by getRuntimeContext().getIndexOfThisSubtask().

UnionListState will replicate all submitted states to all parallel instances, so what was checkpointed on one operator instance will be replicated to all in restore.

Best,
Stefan


Am 04.07.2017 um 13:56 schrieb gerardg <[hidden email]>:

Thanks Fabian, I'll keep an eye to that JIRA.

I'm not sure I follow you Stefan. You mean that I could implement my own
OperatorStateStore and override its methods (e.g. snapshot and restore) to
achieve this functionality? I think I don't have enough knowledge about
Flink's internals to implement this easily.

Gerard



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-operator-state-treating-state-of-all-parallel-operators-as-the-same-tp14102p14111.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Managed operator state treating state of all parallel operators as the same

m@xi
Hello Gyula & Stefan,

Below I attach a similar situation that I am trying to resolve, [1]

I am also using *managed operator state*, but I have some trouble with the
flink documentation. I believe it is not that clear.

So, I have the following questions:
1 -- Can I concatenate all the partial states of all the parallel instances
of a .process() operator for example?
Ans : I believe YES. I should use unionListState(), but I do not know how.

2 -- Can I a perform the merge of all the state in one parallel instance and
then forward the state through a Side Stream inside the the
.processElement() function??

3 -- If (1) and (2) have affirmative answers, could you elaborate further of
how this is done?

Best,
Max

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintain-heavy-hitters-in-Flink-application-td16965.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/