Hi: I am trying to understand broadcast functionality for DataStream and the documentation indicates that it 'Broadcasts elements to every partition.' My question is that since streams are unbounded: 1 What elements get broad to the partitions ? 2. What happens as new elements are added to the stream ? Are only the new elements broadcast ? 3. Since the broadcast operation returns a DataStream can it be used in join how do new (and old) elements affect the join results ? 4. Similarly how does broadcast work with connected streams ? If there are some examples please let me know. Thanks Mans |
Hey Mans!
This refers to how sub tasks are connected to each other in your program. If you have a single sub task A1 and three sub tasks B1, B2, B3, broadcast will emit each incoming record at A1 to all B1, B2, B3: A1 --+-> B1 +-> B2 +-> B3 Does this help? On Mon, Dec 25, 2017 at 7:12 PM, M Singh <[hidden email]> wrote: > 1 What elements get broad to the partitions ? Each incoming element is broadcasted > 2. What happens as new elements are added to the stream ? Are only the new > elements broadcast ? Yes, each incoming element is broadcasted separately without any history. > 3. Since the broadcast operation returns a DataStream can it be used in join > how do new (and old) elements affect the join results ? Yes, should be. Every element is broadcasted only once. > 4. Similarly how does broadcast work with connected streams ? Similar to non connected streams. The incoming records are emitted to every downstream partition. – Ufuk |
Hi Ufuk: Thanks for your explanation. I can understand broadcasting a small immutable dataset to the subtasks so that they can be joined with a stream. However I am still trying to understand how will each broadcasted element from a stream be used in join operation with another stream. Is this just on optimization over joining two streams ? Also, I believe that substasks are operating on partitions of a stream and only equi-joins are possible for streams. So what is the reason we would like to broadcast each element to all the substasks ? Thanks again. On Wednesday, December 27, 2017 12:52 AM, Ufuk Celebi <[hidden email]> wrote: Hey Mans! This refers to how sub tasks are connected to each other in your program. If you have a single sub task A1 and three sub tasks B1, B2, B3, broadcast will emit each incoming record at A1 to all B1, B2, B3: A1 --+-> B1 +-> B2 +-> B3 Does this help? On Mon, Dec 25, 2017 at 7:12 PM, M Singh <[hidden email]> wrote: > 1 What elements get broad to the partitions ? Each incoming element is broadcasted > 2. What happens as new elements are added to the stream ? Are only the new > elements broadcast ? Yes, each incoming element is broadcasted separately without any history. > 3. Since the broadcast operation returns a DataStream can it be used in join > how do new (and old) elements affect the join results ? Yes, should be. Every element is broadcasted only once. > 4. Similarly how does broadcast work with connected streams ? Similar to non connected streams. The incoming records are emitted to every downstream partition. – Ufuk |
Hi, A broadcast replicates all elements by the parallelism of the following operator, i.e., each parallel instance of the following operator receives all events of its input stream. If the operation following a broadcast is a connect and a co-operator (and the other input is not broadcasted), the operator can be used for arbitrary joins (not only equi-joins). Depending on the join semantics, the joining operator needs to put events from one (broadcasted or non-broadcasted) or both inputs as state to be able to perform the join. Best, Fabian 2017-12-30 23:32 GMT+01:00 M Singh <[hidden email]>:
|
Thanks Fabian and Ufuk for your answers. On Thursday, January 4, 2018 12:32 AM, Fabian Hueske <[hidden email]> wrote: Hi, A broadcast replicates all elements by the parallelism of the following operator, i.e., each parallel instance of the following operator receives all events of its input stream. If the operation following a broadcast is a connect and a co-operator (and the other input is not broadcasted), the operator can be used for arbitrary joins (not only equi-joins). Depending on the join semantics, the joining operator needs to put events from one (broadcasted or non-broadcasted) or both inputs as state to be able to perform the join. Best, Fabian 2017-12-30 23:32 GMT+01:00 M Singh <[hidden email]>:
|
Free forum by Nabble | Edit this page |