Apache Flink - Connected Stream with different number of partitions

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Apache Flink - Connected Stream with different number of partitions

M Singh
Hi:

Referring to documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html) for ConnectedStreams:

"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

If the two connected streams have different number of partitions, eg (someStream has 4 and otherStream has 2), then how do the elements of the stream get distributed for the CoMapFunction:

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});

I believe that that if the second stream is broadcast, then each partition of the first will get all the elements of the second.  Is my understanding correct ?

If the streams are not broadcast and since the first stream has 4 partitions and second one had 2, then how are the elements of the second stream distributed to each partition of the first ?

Also, if the streams are not broadcasted but have same number of partitions, how are the elements distributed ?

Thanks

Mans




Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink - Connected Stream with different number of partitions

Timo Walther
Hi Mans,

I did a quick test on my PC where I simply set breakpoints in map1 and map2 (someStream has parallelism 1, otherStream 5, my CoMapFunction 8). Elements of someStream end up in different CoMapTasks (2/8, 7/8 etc.).

So I guess the distribution is a round robin partioning. @Aljoscha might know more about the internals?

Regards,
Timo



Am 12/31/17 um 10:38 PM schrieb M Singh:
Hi:


"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

If the two connected streams have different number of partitions, eg (someStream has 4 and otherStream has 2), then how do the elements of the stream get distributed for the CoMapFunction:

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});

I believe that that if the second stream is broadcast, then each partition of the first will get all the elements of the second.  Is my understanding correct ?

If the streams are not broadcast and since the first stream has 4 partitions and second one had 2, then how are the elements of the second stream distributed to each partition of the first ?

Also, if the streams are not broadcasted but have same number of partitions, how are the elements distributed ?

Thanks

Mans





Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink - Connected Stream with different number of partitions

Aljoscha Krettek
Hi,

The answer is correct but I'll try and elaborate a bit: the way data is sent to downstream operations depends on a couple of things in this case:

 - parallelism of first input operation
 - parallelism of second input operation
 - parallelism of co-operation
 - transmission pattern on first input (broadcast, rebalance, etc.)
 - transmission pattern on second input

Note that there is no parallelism on "streams" since there are technically no streams but only operations that are interconnected in a certain way.

Now, if the input parallelism and the operation parallelism are the same and you don't specify a transmission pattern then data will not be "shuffled" between the operations. If you specify broadcast or rebalance then you will get that, i.e. for broadcast an element from the input operator will be sent to every instance on the downstream operation.

Best,
Aljoscha

On 3. Jan 2018, at 10:43, Timo Walther <[hidden email]> wrote:

Hi Mans,

I did a quick test on my PC where I simply set breakpoints in map1 and map2 (someStream has parallelism 1, otherStream 5, my CoMapFunction 8). Elements of someStream end up in different CoMapTasks (2/8, 7/8 etc.).

So I guess the distribution is a round robin partioning. @Aljoscha might know more about the internals?

Regards,
Timo



Am 12/31/17 um 10:38 PM schrieb M Singh:
Hi:


"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

If the two connected streams have different number of partitions, eg (someStream has 4 and otherStream has 2), then how do the elements of the stream get distributed for the CoMapFunction:

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});

I believe that that if the second stream is broadcast, then each partition of the first will get all the elements of the second.  Is my understanding correct ?

If the streams are not broadcast and since the first stream has 4 partitions and second one had 2, then how are the elements of the second stream distributed to each partition of the first ?

Also, if the streams are not broadcasted but have same number of partitions, how are the elements distributed ?

Thanks

Mans






Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink - Connected Stream with different number of partitions

M Singh
Thanks Aljoscha and Timo for your answers.  I will try to digest the pointers you provided.

Mans


On Wednesday, January 3, 2018 3:01 AM, Aljoscha Krettek <[hidden email]> wrote:


Hi,

The answer is correct but I'll try and elaborate a bit: the way data is sent to downstream operations depends on a couple of things in this case:

 - parallelism of first input operation
 - parallelism of second input operation
 - parallelism of co-operation
 - transmission pattern on first input (broadcast, rebalance, etc.)
 - transmission pattern on second input

Note that there is no parallelism on "streams" since there are technically no streams but only operations that are interconnected in a certain way.

Now, if the input parallelism and the operation parallelism are the same and you don't specify a transmission pattern then data will not be "shuffled" between the operations. If you specify broadcast or rebalance then you will get that, i.e. for broadcast an element from the input operator will be sent to every instance on the downstream operation.

Best,
Aljoscha

On 3. Jan 2018, at 10:43, Timo Walther <[hidden email]> wrote:

Hi Mans,

I did a quick test on my PC where I simply set breakpoints in map1 and map2 (someStream has parallelism 1, otherStream 5, my CoMapFunction 8). Elements of someStream end up in different CoMapTasks (2/8, 7/8 etc.).

So I guess the distribution is a round robin partioning. @Aljoscha might know more about the internals?

Regards,
Timo



Am 12/31/17 um 10:38 PM schrieb M Singh:
Hi:


"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

If the two connected streams have different number of partitions, eg (someStream has 4 and otherStream has 2), then how do the elements of the stream get distributed for the CoMapFunction:

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});

I believe that that if the second stream is broadcast, then each partition of the first will get all the elements of the second.  Is my understanding correct ?

If the streams are not broadcast and since the first stream has 4 partitions and second one had 2, then how are the elements of the second stream distributed to each partition of the first ?

Also, if the streams are not broadcasted but have same number of partitions, how are the elements distributed ?

Thanks

Mans