/newbie/ Share state between streams

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

/newbie/ Share state between streams

Andrey Utkin
Hi,

I am newbie in Flink and have questions about stream states. I can’t find answers in documentation, but if I just miss one, please link to doc)

1. Is ValueState (and other state classes) are ‘stream' scoped? So that, it is not possible to share same state with two (or more) different pipelines in a same job:
- sourceA -> keyBy -> mapWithStateXUpdate -> print
- sourceB -> keyBy -> mapUsingStateXValue -> print

Operators ‘mapWithStateXUpdate’ and ‘mapUsingStateXValue’ will use separate copy of StateX even they use same name for it and same key (ValueStateDescriptor(“StateX”…)).  Is it right?

2. In Streaming Guide about connect() operation:
==
Connect allowing for shared state between the two streams.
==

But how do access state from operators after connect - connect() returns ConnectedStream but not Keyed, so states defined early is not accessible. Or it means that doing keyBy after connect() will allow to define new state based on values from both streams? 

3. Is state are ‘operator’ scoped? 

- source -> KeyBy -> mapWithStateX_1 -> keyBy -> mapWithStateX_2

Assume both map try to use state with same name (ValueStateDescriptor(“StateX”…)). But despite of that, they will have different copies. is it right?


Thanks!

--
Andrey Utkin <[hidden email]>





Reply | Threaded
Open this post in threaded view
|

Re: /newbie/ Share state between streams

Jamie Grier
Hi Andrey,

State is scoped by operator instance so you cannot share state between two different operators even if they use the same name for the state -- "StateX" in your example.

Also, you are correct that the recommended way of doing what you want is to use the connect() operations to connect the two streams and then write a CoMap or CoFlatMap function.  In this way you are processing the two streams with a single operator instance and can achieve what you want.

If you call keyBy() on each stream and then connect() you will already be in a keyed context in your CoFlatMap operator.  There is no reason to call keyBy() again.

For example:

val keyedStream1 = stream1
  .keyBy("foo")
  
val keyedStream2 = stream2
  .keyBy("foo)

val connectedStream =  keyedStream1.connect(keyedStream2)

connectedStream.flatMap( YourFunction )

In your YourFunction you'll be able to access per-key state.

Does that help?

On Fri, Jun 3, 2016 at 9:34 AM, Andrey Utkin <[hidden email]> wrote:
Hi,

I am newbie in Flink and have questions about stream states. I can’t find answers in documentation, but if I just miss one, please link to doc)

1. Is ValueState (and other state classes) are ‘stream' scoped? So that, it is not possible to share same state with two (or more) different pipelines in a same job:
- sourceA -> keyBy -> mapWithStateXUpdate -> print
- sourceB -> keyBy -> mapUsingStateXValue -> print

Operators ‘mapWithStateXUpdate’ and ‘mapUsingStateXValue’ will use separate copy of StateX even they use same name for it and same key (ValueStateDescriptor(“StateX”…)).  Is it right?

2. In Streaming Guide about connect() operation:
==
Connect allowing for shared state between the two streams.
==

But how do access state from operators after connect - connect() returns ConnectedStream but not Keyed, so states defined early is not accessible. Or it means that doing keyBy after connect() will allow to define new state based on values from both streams? 

3. Is state are ‘operator’ scoped? 

- source -> KeyBy -> mapWithStateX_1 -> keyBy -> mapWithStateX_2

Assume both map try to use state with same name (ValueStateDescriptor(“StateX”…)). But despite of that, they will have different copies. is it right?


Thanks!

--
Andrey Utkin <[hidden email]>








--

Jamie Grier
data Artisans, Director of Applications Engineering

Reply | Threaded
Open this post in threaded view
|

Re: /newbie/ Share state between streams

Andrey Utkin
Hi Jamie,

Your answer was very helpful.

Thanks a lot.