Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

vishalovercome
Suppose i have a job with 3 operators with the following job graph:

O1 => O2 // data stream partitioned by keyBy
O1 => O3 // data stream partitioned by keyBy
O2 => O3 // data stream partitioned by keyBy

If operator O3 receives inputs from two operators and both inputs have the
same type and value for a key then will the two streams end up in the same
sub-task and therefore affect the same state variables keyed to that
particular key? Do the streams themselves have to have the same type or is
it enough that just the keys of each of the input streams have the same type
and value?

If they're not guaranteed to affect the same state then how can we achieve
the same? I would prefer to use the simple
RichMapFunction/RichFlatmapFunction for modelling my operators as opposed to
any join function.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

Matthias
Hi Vishal,
I'm not 100% sure what you're trying to do. But the partitioning by a key just relies on the key on the used parallelism. So, I guess, what you propose should work.
You would have to rely on some join function, though, when merging two input operators into one again.

I hope that was helpful.
Best,
Matthias

On Tue, Mar 23, 2021 at 3:29 PM vishalovercome <[hidden email]> wrote:
Suppose i have a job with 3 operators with the following job graph:

O1 => O2 // data stream partitioned by keyBy
O1 => O3 // data stream partitioned by keyBy
O2 => O3 // data stream partitioned by keyBy

If operator O3 receives inputs from two operators and both inputs have the
same type and value for a key then will the two streams end up in the same
sub-task and therefore affect the same state variables keyed to that
particular key? Do the streams themselves have to have the same type or is
it enough that just the keys of each of the input streams have the same type
and value?

If they're not guaranteed to affect the same state then how can we achieve
the same? I would prefer to use the simple
RichMapFunction/RichFlatmapFunction for modelling my operators as opposed to
any join function.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

vishalovercome
Let me make the example more concrete. Say O1 gets as input a data stream T1
which it splits into two using some function and produces DataStreams of
type T2 and T3, each of which are partitioned by the same key function TK.
Now after O2 processes a stream, it could sometimes send the stream to O3
(T4) using the same key function again. Now I want to know whether:

1. Data from streams T3 with key K and T4 with key K end up affecting the
state variables for the same key K or different. I would think that would be
the case but wanted a confirmation
2. An explicit join is needed or not, i.e. whether this will achieve what I
want:

result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3
does)
result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

Matthias
1. yes - the same key would affect the same state variable
2. you need a join to have the same operator process both streams

Matthias

On Wed, Mar 24, 2021 at 7:29 AM vishalovercome <[hidden email]> wrote:
Let me make the example more concrete. Say O1 gets as input a data stream T1
which it splits into two using some function and produces DataStreams of
type T2 and T3, each of which are partitioned by the same key function TK.
Now after O2 processes a stream, it could sometimes send the stream to O3
(T4) using the same key function again. Now I want to know whether:

1. Data from streams T3 with key K and T4 with key K end up affecting the
state variables for the same key K or different. I would think that would be
the case but wanted a confirmation
2. An explicit join is needed or not, i.e. whether this will achieve what I
want:

result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3
does)
result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

David Anderson-4
For an example of a similar join implemented as a RichCoFlatMap, see [1]. For more background, the Flink docs have a tutorial [2] on how to work with connected streams.


On Wed, Mar 24, 2021 at 8:55 AM Matthias Pohl <[hidden email]> wrote:
1. yes - the same key would affect the same state variable
2. you need a join to have the same operator process both streams

Matthias

On Wed, Mar 24, 2021 at 7:29 AM vishalovercome <[hidden email]> wrote:
Let me make the example more concrete. Say O1 gets as input a data stream T1
which it splits into two using some function and produces DataStreams of
type T2 and T3, each of which are partitioned by the same key function TK.
Now after O2 processes a stream, it could sometimes send the stream to O3
(T4) using the same key function again. Now I want to know whether:

1. Data from streams T3 with key K and T4 with key K end up affecting the
state variables for the same key K or different. I would think that would be
the case but wanted a confirmation
2. An explicit join is needed or not, i.e. whether this will achieve what I
want:

result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3
does)
result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

vishalovercome
I've gone through the example as well as the documentation and I still couldn't understand whether my use case requires joining. 1. What would happen if I didn't join? 2. As the 2 incoming data streams have the same type, if joining is absolutely necessary then just a union (oneStream.union(anotherStream)) followed by a keyBy should be good enough right? I am asking this because I would prefer to use the simple RichMapFunction or RichFlatMapFunction as opposed to the RichCoFlatMapFunction. Thanks a lot!

Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

David Anderson-3
Yes, since the two streams have the same type, you can union the two streams, key the resulting stream, and then apply something like a RichFlatMapFunction. Or you can connect the two streams (again, they'll need to be keyed so you can use state), and apply a RichCoFlatMapFunction. You can use whichever of these approaches is simpler for your use case.

On Mon, Mar 29, 2021 at 7:56 AM vishalovercome <[hidden email]> wrote:
I've gone through the example as well as the documentation and I still couldn't understand whether my use case requires joining. 1. What would happen if I didn't join? 2. As the 2 incoming data streams have the same type, if joining is absolutely necessary then just a union (oneStream.union(anotherStream)) followed by a keyBy should be good enough right? I am asking this because I would prefer to use the simple RichMapFunction or RichFlatMapFunction as opposed to the RichCoFlatMapFunction. Thanks a lot!

Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.