Get which key groups are assigned to an operator

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Get which key groups are assigned to an operator

gerardg
Hello,

To improve performance we have " keyed state" in the operator's memory,
basically we keep a Map which contains the state per each of the keys. The
problem comes when we want to restore the state after a failure or after
rescaling the operator. What we are doing is sending the concatenation of
all the state to every operator using an union redistribution and then we
restore the "in memory state" every time we see a new key. Then, after a
while, we just clear the redistributed state. This is somewhat complex and
prone to errors so we would like to find an alternative way of doing this.

As far as I know Flink knows which keys belong to each operator
(distributing key groups) so I guess it would be possible to somehow
calculate the key id from each of the stored keys and restore the in memory
state at once if we could access to the key groups mapping. Is that
possible? We could patch Flink if necessary to access that information.

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Get which key groups are assigned to an operator

Stefan Richter
Hi,

from what I read, I get the impression that you attempt to implement you own "keyed state" with a hashmap? Why not using the keyed state that is already provided by Flink and gives you efficient rescaling etc. out of the box? Please see [1] for the details.


Am 20.02.2018 um 13:44 schrieb gerardg <[hidden email]>:

Hello,

To improve performance we have " keyed state" in the operator's memory,
basically we keep a Map which contains the state per each of the keys. The
problem comes when we want to restore the state after a failure or after
rescaling the operator. What we are doing is sending the concatenation of
all the state to every operator using an union redistribution and then we
restore the "in memory state" every time we see a new key. Then, after a
while, we just clear the redistributed state. This is somewhat complex and
prone to errors so we would like to find an alternative way of doing this.

As far as I know Flink knows which keys belong to each operator
(distributing key groups) so I guess it would be possible to somehow
calculate the key id from each of the stored keys and restore the in memory
state at once if we could access to the key groups mapping. Is that
possible? We could patch Flink if necessary to access that information.

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Get which key groups are assigned to an operator

gerardg
Hi Stefan, thanks

Yes, we are also using keyed state in other operators the problem is that serialization is quite expensive and in some of them we would prefer to avoid it by storing the state in memory (for our use case one specific operator with in memory state gives at least a 30% throughput improvement). When we are not operating in a keyed stream is easy, basically all the operators have the same in memory state, what we would like to do is the same but when we are operating in a keyed stream. Does it make more sense now?

We are using rocksdb as state backend and as far as I know elements get always serialized when stored in the state and I'm not sure if there is even some disk access (maybe not synchronously) that could hurt performance.

Gerard

On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter <[hidden email]> wrote:
Hi,

from what I read, I get the impression that you attempt to implement you own "keyed state" with a hashmap? Why not using the keyed state that is already provided by Flink and gives you efficient rescaling etc. out of the box? Please see [1] for the details.


Am 20.02.2018 um 13:44 schrieb gerardg <[hidden email]>:

Hello,

To improve performance we have " keyed state" in the operator's memory,
basically we keep a Map which contains the state per each of the keys. The
problem comes when we want to restore the state after a failure or after
rescaling the operator. What we are doing is sending the concatenation of
all the state to every operator using an union redistribution and then we
restore the "in memory state" every time we see a new key. Then, after a
while, we just clear the redistributed state. This is somewhat complex and
prone to errors so we would like to find an alternative way of doing this.

As far as I know Flink knows which keys belong to each operator
(distributing key groups) so I guess it would be possible to somehow
calculate the key id from each of the stored keys and restore the in memory
state at once if we could access to the key groups mapping. Is that
possible? We could patch Flink if necessary to access that information.

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Get which key groups are assigned to an operator

Stefan Richter
Hi,

ok, now I understand your goal a bit better. If would still like to point out that it may take a bit more than it looks like. Just to name one example, you probably also want to support asynchronous snapshots which is most likely difficult when using just a hashmap. I think the proper solution for you (and also something that we are considering to support in the future) is that different backends could be supported for different operators in a job. But that is currently not possible. I still want to answer your other question: you could currently compute all things about key-groups and their assignment to operators by using the methods from org.apache.flink.runtime.state.KeyGroupRangeAssignment.

Best,
Stefan

Am 20.02.2018 um 14:52 schrieb Gerard Garcia <[hidden email]>:

Hi Stefan, thanks

Yes, we are also using keyed state in other operators the problem is that serialization is quite expensive and in some of them we would prefer to avoid it by storing the state in memory (for our use case one specific operator with in memory state gives at least a 30% throughput improvement). When we are not operating in a keyed stream is easy, basically all the operators have the same in memory state, what we would like to do is the same but when we are operating in a keyed stream. Does it make more sense now?

We are using rocksdb as state backend and as far as I know elements get always serialized when stored in the state and I'm not sure if there is even some disk access (maybe not synchronously) that could hurt performance.

Gerard

On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter <[hidden email]> wrote:
Hi,

from what I read, I get the impression that you attempt to implement you own "keyed state" with a hashmap? Why not using the keyed state that is already provided by Flink and gives you efficient rescaling etc. out of the box? Please see [1] for the details.


Am 20.02.2018 um 13:44 schrieb gerardg <[hidden email]>:

Hello,

To improve performance we have " keyed state" in the operator's memory,
basically we keep a Map which contains the state per each of the keys. The
problem comes when we want to restore the state after a failure or after
rescaling the operator. What we are doing is sending the concatenation of
all the state to every operator using an union redistribution and then we
restore the "in memory state" every time we see a new key. Then, after a
while, we just clear the redistributed state. This is somewhat complex and
prone to errors so we would like to find an alternative way of doing this.

As far as I know Flink knows which keys belong to each operator
(distributing key groups) so I guess it would be possible to somehow
calculate the key id from each of the stored keys and restore the in memory
state at once if we could access to the key groups mapping. Is that
possible? We could patch Flink if necessary to access that information.

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Reply | Threaded
Open this post in threaded view
|

Re: Get which key groups are assigned to an operator

gerardg
You are right that probably the best solution would be to be able to use different state backends for different operators, I hope it gets implemented at some point. Meanwhile I'll take a look at the methods in org.apache.flink.runtime.state.KeyGroupRangeAssignment, maybe I can find a workaround good enough for me.

Thanks,

Gerard

On Tue, Feb 20, 2018 at 3:56 PM, Stefan Richter <[hidden email]> wrote:
Hi,

ok, now I understand your goal a bit better. If would still like to point out that it may take a bit more than it looks like. Just to name one example, you probably also want to support asynchronous snapshots which is most likely difficult when using just a hashmap. I think the proper solution for you (and also something that we are considering to support in the future) is that different backends could be supported for different operators in a job. But that is currently not possible. I still want to answer your other question: you could currently compute all things about key-groups and their assignment to operators by using the methods from org.apache.flink.runtime.state.KeyGroupRangeAssignment.

Best,
Stefan


Am 20.02.2018 um 14:52 schrieb Gerard Garcia <[hidden email]>:

Hi Stefan, thanks

Yes, we are also using keyed state in other operators the problem is that serialization is quite expensive and in some of them we would prefer to avoid it by storing the state in memory (for our use case one specific operator with in memory state gives at least a 30% throughput improvement). When we are not operating in a keyed stream is easy, basically all the operators have the same in memory state, what we would like to do is the same but when we are operating in a keyed stream. Does it make more sense now?

We are using rocksdb as state backend and as far as I know elements get always serialized when stored in the state and I'm not sure if there is even some disk access (maybe not synchronously) that could hurt performance.

Gerard

On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter <[hidden email]> wrote:
Hi,

from what I read, I get the impression that you attempt to implement you own "keyed state" with a hashmap? Why not using the keyed state that is already provided by Flink and gives you efficient rescaling etc. out of the box? Please see [1] for the details.


Am 20.02.2018 um 13:44 schrieb gerardg <[hidden email]>:

Hello,

To improve performance we have " keyed state" in the operator's memory,
basically we keep a Map which contains the state per each of the keys. The
problem comes when we want to restore the state after a failure or after
rescaling the operator. What we are doing is sending the concatenation of
all the state to every operator using an union redistribution and then we
restore the "in memory state" every time we see a new key. Then, after a
while, we just clear the redistributed state. This is somewhat complex and
prone to errors so we would like to find an alternative way of doing this.

As far as I know Flink knows which keys belong to each operator
(distributing key groups) so I guess it would be possible to somehow
calculate the key id from each of the stored keys and restore the in memory
state at once if we could access to the key groups mapping. Is that
possible? We could patch Flink if necessary to access that information.

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/