A way to control redistribution of operator state?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

A way to control redistribution of operator state?

Dmitry Golubets
Hi,

It looks impossible to implement a keyed state with operator state now.

I know it sounds like "just use a keyed state", but latter requires updating it on every value change as opposed to operator state and thus can be expensive (especially if you have to deal with mutable structures inside which have to be serialized).

The problem is that there is no way to tell Flink how to reassign savepoint parts between partitions, and thus impossible to route data to correct partitions.

Is there anything I missed or maybe a plan to implement it in future?

Best regards,
Dmitry
Reply | Threaded
Open this post in threaded view
|

Re: A way to control redistribution of operator state?

Tzu-Li (Gordon) Tai
Hi Dmitry,

Technically, from the looks of the internal code around `OperatorStateRepartitioner`, I think it is certainly possible to be pluggable.
Right now it is just hard coded to use a round-robin repartitioner implementation as default.

However, I’m not sure of the plans in exposing this to the user and making it configurable.
Looping in Stefan (in cc) who mostly worked on this part and see if he can provide more info.

- Gordon

On February 14, 2017 at 2:30:27 AM, Dmitry Golubets ([hidden email]) wrote:

Hi,

It looks impossible to implement a keyed state with operator state now.

I know it sounds like "just use a keyed state", but latter requires updating it on every value change as opposed to operator state and thus can be expensive (especially if you have to deal with mutable structures inside which have to be serialized).

The problem is that there is no way to tell Flink how to reassign savepoint parts between partitions, and thus impossible to route data to correct partitions.

Is there anything I missed or maybe a plan to implement it in future?

Best regards,
Dmitry
Reply | Threaded
Open this post in threaded view
|

Re: A way to control redistribution of operator state?

Stefan Richter
Hi,

there is something that we call "raw keyed“ operator state, which might exactly serve your purpose. It is already used internally by Flink’s window operator, but there exists currently no public API for this feature. Way it works currently is that you obtain input and output streams that are aware of key-groups being written or read, but the API needs to consider the fact that each key-group must be written only once and complete before the next key-group can start. This is a bit tricky to expose for inheritance hierarchies. My guess is that you can expect this for the next version of Flink.

Best,
Stefan

Am 14.02.2017 um 08:31 schrieb Tzu-Li (Gordon) Tai <[hidden email]>:

Hi Dmitry,

Technically, from the looks of the internal code around `OperatorStateRepartitioner`, I think it is certainly possible to be pluggable.
Right now it is just hard coded to use a round-robin repartitioner implementation as default.

However, I’m not sure of the plans in exposing this to the user and making it configurable.
Looping in Stefan (in cc) who mostly worked on this part and see if he can provide more info.

- Gordon

On February 14, 2017 at 2:30:27 AM, Dmitry Golubets ([hidden email]) wrote:

Hi,

It looks impossible to implement a keyed state with operator state now.

I know it sounds like "just use a keyed state", but latter requires updating it on every value change as opposed to operator state and thus can be expensive (especially if you have to deal with mutable structures inside which have to be serialized).

The problem is that there is no way to tell Flink how to reassign savepoint parts between partitions, and thus impossible to route data to correct partitions.

Is there anything I missed or maybe a plan to implement it in future?

Best regards,
Dmitry

Reply | Threaded
Open this post in threaded view
|

Re: A way to control redistribution of operator state?

Dmitry Golubets
Hi,

I was playing with it more today and I think I've found a workaround.

So what I do:
1. I define a constant N logical groups
2. I use consistent hash mapping of data keys to these groups
3. I map these groups to partitions using even distribution (same as Flink distributes state)
4. In a stateful function I'm able to calculate wich groups are assigned to that partition and produce the right number of states for each groups (empty states too)
5. I do manual partitioning before that stateful function using same calculations with groups

So far it looks like scaling up and down results in correct behavior.
Can I rely on Flink distributing state evenly and in the order I return it in the list?

Best regards,
Dmitry

On Tue, Feb 14, 2017 at 9:33 AM, Stefan Richter <[hidden email]> wrote:
Hi,

there is something that we call "raw keyed“ operator state, which might exactly serve your purpose. It is already used internally by Flink’s window operator, but there exists currently no public API for this feature. Way it works currently is that you obtain input and output streams that are aware of key-groups being written or read, but the API needs to consider the fact that each key-group must be written only once and complete before the next key-group can start. This is a bit tricky to expose for inheritance hierarchies. My guess is that you can expect this for the next version of Flink.

Best,
Stefan

Am 14.02.2017 um 08:31 schrieb Tzu-Li (Gordon) Tai <[hidden email]>:

Hi Dmitry,

Technically, from the looks of the internal code around `OperatorStateRepartitioner`, I think it is certainly possible to be pluggable.
Right now it is just hard coded to use a round-robin repartitioner implementation as default.

However, I’m not sure of the plans in exposing this to the user and making it configurable.
Looping in Stefan (in cc) who mostly worked on this part and see if he can provide more info.

- Gordon

On February 14, 2017 at 2:30:27 AM, Dmitry Golubets ([hidden email]) wrote:

Hi,

It looks impossible to implement a keyed state with operator state now.

I know it sounds like "just use a keyed state", but latter requires updating it on every value change as opposed to operator state and thus can be expensive (especially if you have to deal with mutable structures inside which have to be serialized).

The problem is that there is no way to tell Flink how to reassign savepoint parts between partitions, and thus impossible to route data to correct partitions.

Is there anything I missed or maybe a plan to implement it in future?

Best regards,
Dmitry


Reply | Threaded
Open this post in threaded view
|

Re: A way to control redistribution of operator state?

Stefan Richter
Hi,

I think the clean solution would be using raw keyed state once it becomes available. For the meantime, your solution could work. However, you should be aware that your approach does not rely on a contract but an implementation detail that *could* change between versions and break your code in subtle ways.

Best,
Stefan

Am 14.02.2017 um 12:19 schrieb Dmitry Golubets <[hidden email]>:

Hi,

I was playing with it more today and I think I've found a workaround.

So what I do:
1. I define a constant N logical groups
2. I use consistent hash mapping of data keys to these groups
3. I map these groups to partitions using even distribution (same as Flink distributes state)
4. In a stateful function I'm able to calculate wich groups are assigned to that partition and produce the right number of states for each groups (empty states too)
5. I do manual partitioning before that stateful function using same calculations with groups

So far it looks like scaling up and down results in correct behavior.
Can I rely on Flink distributing state evenly and in the order I return it in the list?

Best regards,
Dmitry

On Tue, Feb 14, 2017 at 9:33 AM, Stefan Richter <[hidden email]> wrote:
Hi,

there is something that we call "raw keyed“ operator state, which might exactly serve your purpose. It is already used internally by Flink’s window operator, but there exists currently no public API for this feature. Way it works currently is that you obtain input and output streams that are aware of key-groups being written or read, but the API needs to consider the fact that each key-group must be written only once and complete before the next key-group can start. This is a bit tricky to expose for inheritance hierarchies. My guess is that you can expect this for the next version of Flink.

Best,
Stefan

Am 14.02.2017 um 08:31 schrieb Tzu-Li (Gordon) Tai <[hidden email]>:

Hi Dmitry,

Technically, from the looks of the internal code around `OperatorStateRepartitioner`, I think it is certainly possible to be pluggable.
Right now it is just hard coded to use a round-robin repartitioner implementation as default.

However, I’m not sure of the plans in exposing this to the user and making it configurable.
Looping in Stefan (in cc) who mostly worked on this part and see if he can provide more info.

- Gordon

On February 14, 2017 at 2:30:27 AM, Dmitry Golubets ([hidden email]) wrote:

Hi,

It looks impossible to implement a keyed state with operator state now.

I know it sounds like "just use a keyed state", but latter requires updating it on every value change as opposed to operator state and thus can be expensive (especially if you have to deal with mutable structures inside which have to be serialized).

The problem is that there is no way to tell Flink how to reassign savepoint parts between partitions, and thus impossible to route data to correct partitions.

Is there anything I missed or maybe a plan to implement it in future?

Best regards,
Dmitry