Questions regarding Key Managed state

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Questions regarding Key Managed state

KristoffSC
Hi
I have few question regarding Flink's state.

Lets say we have:

Case 1.
stream.keybBy(...).process(myProcessFunction).parallelism(3).

MyProcessFucntion uses a managed state (mapState, ListState etc). I'm using
state checkpoints.

Flink will redistribute events across 3 instances of myProcessFunction
according to keyby function.
When job is restarted with the same parallelism level, state is recovered
from last checkpoint and traffic is redistributed across process Function
with the same manner.

What will happen though, if I will increase the parallelism level to 4.
The traffic will be distributed across 4 instances now, so key that was
originally going to operator 3, now can go to operator 4. What will happen
with managed state that originally was builder for this key. Will it be
accessible from new operator instance now? From [1] where I can read " Flink
is able to automatically redistribute state when the parallelism is changed,
and also do better memory management." I will assume YES.

Case 2:
Lets assume I have two operators, where each of them is using a managed
state. From documentation I can read that managed state can be used only on
a keyed stream. This means that I will have to key my stream twice (or more)
if I want to use managed stream in all of my operators? What if the actual
keyBy function will be the same for all pipeline. Each keyBy function hit
performance right?


Case 3:
Is there a possibility to use managed state on non keyed stream? For example
I have a process function that has a "map of key value mappings" This map
can be delivered/build using a broadcast state pattern and can be quite big.
Sounds like a good place to use MapState, but the stream is not keyed.
How can I approach this?

Lets assume for all cases that I'm using a RocksDB state backend

Thanks,


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Questions regarding Key Managed state

Timo Walther
Hi Kristoff,

case 1:

first of all Flink groups keys internally into so-called "key groups"
for reducing the management overhead. The maximum parallelism decides
about the number of key groups. When performing a rescale, the key
groups are basically distributed using some consistent hashing algorithm
and the traffic will end up at the operator with the new location of the
key group.

So the answer is yes, Flink is taking care of redistributing managed
state and rerouting new incoming data to the new location.

case 2:

If the key remains constant, your are loosing the power of parallelizing
your pipeline. You could introduce some artificial but deterministic key
in some map function before the keyBy to solve this problem.
But there is also operator state, which is a special kind of managed
state that does not require a keyBy [1]. Or you might use a
BroadcastProcessFunction depending on your use case [2].

case 3:

I think the KeyedBroadcastProcessFunction [2] is exactly what you are
looking for. It allows you to maintain a map per operator. In the
example in the docs, it maps String to some Rule.

Regards,
Timo


[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#operator-state
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/broadcast_state.html


On 02.04.20 10:56, KristoffSC wrote:

> Hi
> I have few question regarding Flink's state.
>
> Lets say we have:
>
> Case 1.
> stream.keybBy(...).process(myProcessFunction).parallelism(3).
>
> MyProcessFucntion uses a managed state (mapState, ListState etc). I'm using
> state checkpoints.
>
> Flink will redistribute events across 3 instances of myProcessFunction
> according to keyby function.
> When job is restarted with the same parallelism level, state is recovered
> from last checkpoint and traffic is redistributed across process Function
> with the same manner.
>
> What will happen though, if I will increase the parallelism level to 4.
> The traffic will be distributed across 4 instances now, so key that was
> originally going to operator 3, now can go to operator 4. What will happen
> with managed state that originally was builder for this key. Will it be
> accessible from new operator instance now? From [1] where I can read " Flink
> is able to automatically redistribute state when the parallelism is changed,
> and also do better memory management." I will assume YES.
>
> Case 2:
> Lets assume I have two operators, where each of them is using a managed
> state. From documentation I can read that managed state can be used only on
> a keyed stream. This means that I will have to key my stream twice (or more)
> if I want to use managed stream in all of my operators? What if the actual
> keyBy function will be the same for all pipeline. Each keyBy function hit
> performance right?
>
>
> Case 3:
> Is there a possibility to use managed state on non keyed stream? For example
> I have a process function that has a "map of key value mappings" This map
> can be delivered/build using a broadcast state pattern and can be quite big.
> Sounds like a good place to use MapState, but the stream is not keyed.
> How can I approach this?
>
> Lets assume for all cases that I'm using a RocksDB state backend
>
> Thanks,
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: Questions regarding Key Managed state

KristoffSC
Thank you for your answers.

I have one more question.
The Key Managed state for Keyed stream is per key or per operator?

For example I have a keyed stream that is processed by MyProcessFunction
with parallelism = 3. So I have three instances of MyProcessFuntion. The
process function has a KeyManaged state field (Value/List/Key state).

I will have quite big umber of distinct key values in my stream.
With this, the each state will be per each key value or function instance or
maybe key group?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Questions regarding Key Managed state

Congxian Qiu
Hi

Many keys can be in a single state(each state can have multiple key-group, and keys will be assigned to the right key-group)

If you write a custom process function that uses a state you created, then there is only one user state in that instance(do not count the underlying state of Flink if there has any), all the keys go into this instance will use this user state.

Best,
Congxian


KristoffSC <[hidden email]> 于2020年4月3日周五 下午11:27写道:
Thank you for your answers.

I have one more question.
The Key Managed state for Keyed stream is per key or per operator?

For example I have a keyed stream that is processed by MyProcessFunction
with parallelism = 3. So I have three instances of MyProcessFuntion. The
process function has a KeyManaged state field (Value/List/Key state).

I will have quite big umber of distinct key values in my stream.
With this, the each state will be per each key value or function instance or
maybe key group?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/