Apache Flink - broadcasting DataStream

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Apache Flink - broadcasting DataStream

M Singh
Hi:

I am trying to understand broadcast functionality for DataStream and the documentation indicates that it 'Broadcasts elements to every partition.'

My question is that since streams are unbounded:

1 What elements get broad to the partitions ?
2. What happens as new elements are added to the stream ? Are only the new elements broadcast ?
3. Since the broadcast operation returns a DataStream can it be used in join how do new (and old) elements affect the join results ?  
4. Similarly how does broadcast work with connected streams ?

If there are some examples please let me know.

Thanks

Mans
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink - broadcasting DataStream

Ufuk Celebi
Hey Mans!

This refers to how sub tasks are connected to each other in your
program. If you have a single sub task A1 and three sub tasks B1, B2,
B3, broadcast will emit each incoming record at A1 to all B1, B2, B3:

A1 --+-> B1
     +-> B2
     +-> B3

Does this help?

On Mon, Dec 25, 2017 at 7:12 PM, M Singh <[hidden email]> wrote:
> 1 What elements get broad to the partitions ?

Each incoming element is broadcasted

> 2. What happens as new elements are added to the stream ? Are only the new
> elements broadcast ?

Yes, each incoming element is broadcasted separately without any history.

> 3. Since the broadcast operation returns a DataStream can it be used in join
> how do new (and old) elements affect the join results ?

Yes, should be. Every element is broadcasted only once.

> 4. Similarly how does broadcast work with connected streams ?

Similar to non connected streams. The incoming records are emitted to
every downstream partition.

– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink - broadcasting DataStream

M Singh
Hi Ufuk:

Thanks for your explanation.

I can understand broadcasting a small immutable dataset to the subtasks so that they can be joined with a stream.  

However I am still trying to understand how will each broadcasted element from a stream be used in join operation with another stream.  Is this just on optimization over joining two streams ?  

Also, I believe that substasks are operating on partitions of a stream and only equi-joins are possible for streams.  So what is the reason we would like to broadcast each element to all the substasks ?

Thanks again.


On Wednesday, December 27, 2017 12:52 AM, Ufuk Celebi <[hidden email]> wrote:


Hey Mans!

This refers to how sub tasks are connected to each other in your
program. If you have a single sub task A1 and three sub tasks B1, B2,
B3, broadcast will emit each incoming record at A1 to all B1, B2, B3:

A1 --+-> B1
    +-> B2
    +-> B3

Does this help?

On Mon, Dec 25, 2017 at 7:12 PM, M Singh <[hidden email]> wrote:
> 1 What elements get broad to the partitions ?

Each incoming element is broadcasted

> 2. What happens as new elements are added to the stream ? Are only the new
> elements broadcast ?

Yes, each incoming element is broadcasted separately without any history.

> 3. Since the broadcast operation returns a DataStream can it be used in join
> how do new (and old) elements affect the join results ?

Yes, should be. Every element is broadcasted only once.


> 4. Similarly how does broadcast work with connected streams ?


Similar to non connected streams. The incoming records are emitted to
every downstream partition.

– Ufuk


Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink - broadcasting DataStream

Fabian Hueske-2
Hi,

A broadcast replicates all elements by the parallelism of the following operator, i.e., each parallel instance of the following operator receives all events of its input stream.
If the operation following a broadcast is a connect and a co-operator (and the other input is not broadcasted), the operator can be used for arbitrary joins (not only equi-joins).
Depending on the join semantics, the joining operator needs to put events from one (broadcasted or non-broadcasted) or both inputs as state to be able to perform the join.

Best, Fabian


2017-12-30 23:32 GMT+01:00 M Singh <[hidden email]>:
Hi Ufuk:

Thanks for your explanation.

I can understand broadcasting a small immutable dataset to the subtasks so that they can be joined with a stream.  

However I am still trying to understand how will each broadcasted element from a stream be used in join operation with another stream.  Is this just on optimization over joining two streams ?  

Also, I believe that substasks are operating on partitions of a stream and only equi-joins are possible for streams.  So what is the reason we would like to broadcast each element to all the substasks ?

Thanks again.


On Wednesday, December 27, 2017 12:52 AM, Ufuk Celebi <[hidden email]> wrote:


Hey Mans!

This refers to how sub tasks are connected to each other in your
program. If you have a single sub task A1 and three sub tasks B1, B2,
B3, broadcast will emit each incoming record at A1 to all B1, B2, B3:

A1 --+-> B1
    +-> B2
    +-> B3

Does this help?

On Mon, Dec 25, 2017 at 7:12 PM, M Singh <[hidden email]> wrote:
> 1 What elements get broad to the partitions ?

Each incoming element is broadcasted

> 2. What happens as new elements are added to the stream ? Are only the new
> elements broadcast ?

Yes, each incoming element is broadcasted separately without any history.

> 3. Since the broadcast operation returns a DataStream can it be used in join
> how do new (and old) elements affect the join results ?

Yes, should be. Every element is broadcasted only once.


> 4. Similarly how does broadcast work with connected streams ?


Similar to non connected streams. The incoming records are emitted to
every downstream partition.

– Ufuk



Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink - broadcasting DataStream

M Singh
Thanks Fabian and Ufuk for your answers.


On Thursday, January 4, 2018 12:32 AM, Fabian Hueske <[hidden email]> wrote:


Hi,

A broadcast replicates all elements by the parallelism of the following operator, i.e., each parallel instance of the following operator receives all events of its input stream.
If the operation following a broadcast is a connect and a co-operator (and the other input is not broadcasted), the operator can be used for arbitrary joins (not only equi-joins).
Depending on the join semantics, the joining operator needs to put events from one (broadcasted or non-broadcasted) or both inputs as state to be able to perform the join.

Best, Fabian


2017-12-30 23:32 GMT+01:00 M Singh <[hidden email]>:
Hi Ufuk:

Thanks for your explanation.

I can understand broadcasting a small immutable dataset to the subtasks so that they can be joined with a stream.  

However I am still trying to understand how will each broadcasted element from a stream be used in join operation with another stream.  Is this just on optimization over joining two streams ?  

Also, I believe that substasks are operating on partitions of a stream and only equi-joins are possible for streams.  So what is the reason we would like to broadcast each element to all the substasks ?

Thanks again.


On Wednesday, December 27, 2017 12:52 AM, Ufuk Celebi <[hidden email]> wrote:


Hey Mans!

This refers to how sub tasks are connected to each other in your
program. If you have a single sub task A1 and three sub tasks B1, B2,
B3, broadcast will emit each incoming record at A1 to all B1, B2, B3:

A1 --+-> B1
    +-> B2
    +-> B3

Does this help?

On Mon, Dec 25, 2017 at 7:12 PM, M Singh <[hidden email]> wrote:
> 1 What elements get broad to the partitions ?

Each incoming element is broadcasted

> 2. What happens as new elements are added to the stream ? Are only the new
> elements broadcast ?

Yes, each incoming element is broadcasted separately without any history.

> 3. Since the broadcast operation returns a DataStream can it be used in join
> how do new (and old) elements affect the join results ?

Yes, should be. Every element is broadcasted only once.


> 4. Similarly how does broadcast work with connected streams ?


Similar to non connected streams. The incoming records are emitted to
every downstream partition.

– Ufuk