Suppose i have a job with 3 operators with the following job graph:
O1 => O2 // data stream partitioned by keyBy O1 => O3 // data stream partitioned by keyBy O2 => O3 // data stream partitioned by keyBy If operator O3 receives inputs from two operators and both inputs have the same type and value for a key then will the two streams end up in the same sub-task and therefore affect the same state variables keyed to that particular key? Do the streams themselves have to have the same type or is it enough that just the keys of each of the input streams have the same type and value? If they're not guaranteed to affect the same state then how can we achieve the same? I would prefer to use the simple RichMapFunction/RichFlatmapFunction for modelling my operators as opposed to any join function. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Vishal, I'm not 100% sure what you're trying to do. But the partitioning by a key just relies on the key on the used parallelism. So, I guess, what you propose should work. You would have to rely on some join function, though, when merging two input operators into one again. I hope that was helpful. Best, Matthias On Tue, Mar 23, 2021 at 3:29 PM vishalovercome <[hidden email]> wrote: Suppose i have a job with 3 operators with the following job graph: |
Let me make the example more concrete. Say O1 gets as input a data stream T1
which it splits into two using some function and produces DataStreams of type T2 and T3, each of which are partitioned by the same key function TK. Now after O2 processes a stream, it could sometimes send the stream to O3 (T4) using the same key function again. Now I want to know whether: 1. Data from streams T3 with key K and T4 with key K end up affecting the state variables for the same key K or different. I would think that would be the case but wanted a confirmation 2. An explicit join is needed or not, i.e. whether this will achieve what I want: result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3 does) result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
1. yes - the same key would affect the same state variable 2. you need a join to have the same operator process both streams Matthias On Wed, Mar 24, 2021 at 7:29 AM vishalovercome <[hidden email]> wrote: Let me make the example more concrete. Say O1 gets as input a data stream T1 |
For an example of a similar join implemented as a RichCoFlatMap, see [1]. For more background, the Flink docs have a tutorial [2] on how to work with connected streams. On Wed, Mar 24, 2021 at 8:55 AM Matthias Pohl <[hidden email]> wrote:
|
I've gone through the example as well as the documentation and I still couldn't understand whether my use case requires joining.
1. What would happen if I didn't join?
2. As the 2 incoming data streams have the same type, if joining is absolutely necessary then just a union (oneStream.union(anotherStream)) followed by a keyBy should be good enough right? I am asking this because I would prefer to use the simple RichMapFunction or RichFlatMapFunction as opposed to the RichCoFlatMapFunction.
Thanks a lot!
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Yes, since the two streams have the same type, you can union the two streams, key the resulting stream, and then apply something like a RichFlatMapFunction. Or you can connect the two streams (again, they'll need to be keyed so you can use state), and apply a RichCoFlatMapFunction. You can use whichever of these approaches is simpler for your use case. On Mon, Mar 29, 2021 at 7:56 AM vishalovercome <[hidden email]> wrote: I've gone through the example as well as the documentation and I still couldn't understand whether my use case requires joining. 1. What would happen if I didn't join? 2. As the 2 incoming data streams have the same type, if joining is absolutely necessary then just a union (oneStream.union(anotherStream)) followed by a keyBy should be good enough right? I am asking this because I would prefer to use the simple RichMapFunction or RichFlatMapFunction as opposed to the RichCoFlatMapFunction. Thanks a lot! |
Free forum by Nabble | Edit this page |