Forward Partitioning & same Parallelism: 1:1 communication?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Forward Partitioning & same Parallelism: 1:1 communication?

Nicaz
Hello,

I have a question about forward partitioning in Flink.

If Operator A and Operator B have the same parallelism set and forward partitioning is used for events coming from instances of A and going to instances of B:

Will each instance of A send events to _exactly one_ instance of B?

That is, will all events coming from a specific instance of A go to the _same_ specific instance of B, and will _all_ instances of B be used?
Or are there any situations where an instance of A will distribute events to several different instances of B, or where two instances of A will send events to the same instance of B (possibly leaving some other instance of B unused)?

I'd be very happy if someone were able to shed some light on this issue. :-)

Thanks in advance
Nica
Reply | Threaded
Open this post in threaded view
|

Re: Forward Partitioning & same Parallelism: 1:1 communication?

Márton Balassi
Dear Nica,

Yes, forward partitioning means that if subsequent operators share parallelism then the output of an upstream operator is sent to exactly one downstream operator. This makes sense for operators working on individual records, e.g. a typical map-filter pair, because as a consequence Flink may be able to collocate these operator pairs on the same physical machine.

Best,

Marton

On Tue, Aug 11, 2015 at 11:41 PM, Nicaz <[hidden email]> wrote:
Hello,

I have a question about forward partitioning in Flink.

If Operator A and Operator B have the same parallelism set and forward
partitioning is used for events coming from instances of A and going to
instances of B:

Will each instance of A send events to _exactly one_ instance of B?

That is, will all events coming from a specific instance of A go to the
_same_ specific instance of B, and will _all_ instances of B be used?
Or are there any situations where an instance of A will distribute events to
several different instances of B, or where two instances of A will send
events to the same instance of B (possibly leaving some other instance of B
unused)?

I'd be very happy if someone were able to shed some light on this issue. :-)

Thanks in advance
Nica



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Forward-Partitioning-same-Parallelism-1-1-communication-tp2373.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Forward Partitioning & same Parallelism: 1:1 communication?

Ufuk Celebi-2
Hey Marton,

out of curiosity: is this using Flink’s “point” connections underneath or is there some custom logic for streaming jobs?

What happens if operator B has 2 times the parallelism of operator A? For example if there were parallel tasks A1 and A2 and B1-B4: would A1 send to B1 *and* B2 or just B1?

– Ufuk

On 12 Aug 2015, at 10:39, Márton Balassi <[hidden email]> wrote:

Dear Nica,

Yes, forward partitioning means that if subsequent operators share parallelism then the output of an upstream operator is sent to exactly one downstream operator. This makes sense for operators working on individual records, e.g. a typical map-filter pair, because as a consequence Flink may be able to collocate these operator pairs on the same physical machine.

Best,

Marton

On Tue, Aug 11, 2015 at 11:41 PM, Nicaz <[hidden email]> wrote:
Hello,

I have a question about forward partitioning in Flink.

If Operator A and Operator B have the same parallelism set and forward
partitioning is used for events coming from instances of A and going to
instances of B:

Will each instance of A send events to _exactly one_ instance of B?

That is, will all events coming from a specific instance of A go to the
_same_ specific instance of B, and will _all_ instances of B be used?
Or are there any situations where an instance of A will distribute events to
several different instances of B, or where two instances of A will send
events to the same instance of B (possibly leaving some other instance of B
unused)?

I'd be very happy if someone were able to shed some light on this issue. :-)

Thanks in advance
Nica



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Forward-Partitioning-same-Parallelism-1-1-communication-tp2373.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Forward Partitioning & same Parallelism: 1:1 communication?

Márton Balassi
Hey Ufuk,

The shipping strategy name forward is shared between batch and streaming and Nica did not specify either API, so I tried to give a generic answer.

I assume that your question is specifically for streaming, in that case: Yes, streaming is using the pointwise distribution pattern. [1] Unfortunately your concern is true, currently streaming would leave extra downstream operator instances idle, but Aljoscha has an open pull request fixing this issue amongst others. See the discussion here. [2]


Cheers,

Marton

On Wed, Aug 12, 2015 at 11:33 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Marton,

out of curiosity: is this using Flink’s “point” connections underneath or is there some custom logic for streaming jobs?

What happens if operator B has 2 times the parallelism of operator A? For example if there were parallel tasks A1 and A2 and B1-B4: would A1 send to B1 *and* B2 or just B1?

– Ufuk

On 12 Aug 2015, at 10:39, Márton Balassi <[hidden email]> wrote:

Dear Nica,

Yes, forward partitioning means that if subsequent operators share parallelism then the output of an upstream operator is sent to exactly one downstream operator. This makes sense for operators working on individual records, e.g. a typical map-filter pair, because as a consequence Flink may be able to collocate these operator pairs on the same physical machine.

Best,

Marton

On Tue, Aug 11, 2015 at 11:41 PM, Nicaz <[hidden email]> wrote:
Hello,

I have a question about forward partitioning in Flink.

If Operator A and Operator B have the same parallelism set and forward
partitioning is used for events coming from instances of A and going to
instances of B:

Will each instance of A send events to _exactly one_ instance of B?

That is, will all events coming from a specific instance of A go to the
_same_ specific instance of B, and will _all_ instances of B be used?
Or are there any situations where an instance of A will distribute events to
several different instances of B, or where two instances of A will send
events to the same instance of B (possibly leaving some other instance of B
unused)?

I'd be very happy if someone were able to shed some light on this issue. :-)

Thanks in advance
Nica



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Forward-Partitioning-same-Parallelism-1-1-communication-tp2373.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: Forward Partitioning & same Parallelism: 1:1 communication?

Ufuk Celebi
Thanks :) Regarding your answer to Nica: I didn't mean to say that it was too generic or anything... it was very nice. I was just curious, that's why I asked.

On Wed, Aug 12, 2015 at 11:45 AM, Márton Balassi <[hidden email]> wrote:
Hey Ufuk,

The shipping strategy name forward is shared between batch and streaming and Nica did not specify either API, so I tried to give a generic answer.

I assume that your question is specifically for streaming, in that case: Yes, streaming is using the pointwise distribution pattern. [1] Unfortunately your concern is true, currently streaming would leave extra downstream operator instances idle, but Aljoscha has an open pull request fixing this issue amongst others. See the discussion here. [2]


Cheers,

Marton

On Wed, Aug 12, 2015 at 11:33 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Marton,

out of curiosity: is this using Flink’s “point” connections underneath or is there some custom logic for streaming jobs?

What happens if operator B has 2 times the parallelism of operator A? For example if there were parallel tasks A1 and A2 and B1-B4: would A1 send to B1 *and* B2 or just B1?

– Ufuk

On 12 Aug 2015, at 10:39, Márton Balassi <[hidden email]> wrote:

Dear Nica,

Yes, forward partitioning means that if subsequent operators share parallelism then the output of an upstream operator is sent to exactly one downstream operator. This makes sense for operators working on individual records, e.g. a typical map-filter pair, because as a consequence Flink may be able to collocate these operator pairs on the same physical machine.

Best,

Marton

On Tue, Aug 11, 2015 at 11:41 PM, Nicaz <[hidden email]> wrote:
Hello,

I have a question about forward partitioning in Flink.

If Operator A and Operator B have the same parallelism set and forward
partitioning is used for events coming from instances of A and going to
instances of B:

Will each instance of A send events to _exactly one_ instance of B?

That is, will all events coming from a specific instance of A go to the
_same_ specific instance of B, and will _all_ instances of B be used?
Or are there any situations where an instance of A will distribute events to
several different instances of B, or where two instances of A will send
events to the same instance of B (possibly leaving some other instance of B
unused)?

I'd be very happy if someone were able to shed some light on this issue. :-)

Thanks in advance
Nica



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Forward-Partitioning-same-Parallelism-1-1-communication-tp2373.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




Reply | Threaded
Open this post in threaded view
|

Re: Forward Partitioning & same Parallelism: 1:1 communication?

Nicaz
Hey Márton, hey Ufuk,

thank you for your replies, that was very helpful!

I now have an additional question based on Márton's answer to Ufuk's question (by the way, I'm currently working only with the streaming API, so I am most interested in answers concerning streaming than batch processing... :-) )

In the second link Márton provided [1] it says:

"This was not very transparent: When you went from low parallelism to high dop some downstream operators would never get any input."

A question below the pull request then asks "If a non parallel source is used does the user need to call rebalance to use all parallel instances of the downstream operator?" and I don't think that question was explicitly answered. The closest thing to an explicit answer is "[so far] forward was assumed. This was valid for a change of parallelism, which led to either the degenerative case of only one downstream instance receiving elements (1 to n parallelism)".

To me, that sounds as if up until right now, in a situation where operator A has lower parallelism than the following downstream operator B (for example, source A with parallelism 1 and filter B with parallelism 4), not all instances of B would receive output from A if forward partitioning is used.

Now, in the docs [2] it says:
"Forward (default)
: Forward partitioning directs the output data to the next operator on the same machine (if possible) avoiding expensive network I/O. _If there are more processing nodes than inputs or vice versa the load is distributed among the extra nodes in a round-robin fashion_. This is the default partitioner."

So far, I would've thought that the middle sentence describes that when forward partitioning is used when the parallelism differs, outputs will be forwarded to the next operator on the same machine where possible, but also distributing some outputs to the extra nodes with round-robin. However, I've tested the setup describes above (see below) and it seems that Flink uses "normal" round-robin partitioning (rebalance partitioning) when the parallelism differs - using round-robin for _all_ outputs, not doing any "forwarding" (in the forward partitioning sense). Is that correct?

My little test: 1 Sink, 4 Filters
I tried that with Flink 0.9 and, even though I did not explicitly specify any partitioning (so the default, forward, should have been used), Flink apparently uses rebalance partitioning in this case - from the log:
DEBUG StreamingJobGraphGenerator:235 Thread-1 - Parallelism set: 4 for 2
DEBUG StreamingJobGraphGenerator:235 Thread-1 - Parallelism set: 1 for 1
DEBUG StreamingJobGraphGenerator:312 Thread-1 - CONNECTED: RebalancePartitioner - 1 -> 2

Same thing happened reversely when I went from 4 filters (B) to 1 sink C, rebalancing was apparently used.

So that one problem (concerning downstream operators not receiving outputs when forward partitioning is used) described in the pull request is apparently already fixed in 0.9 - or does it only work correctly for the source/sink connection and not between other operators (I did not have time to try more scenarios)?

Again, I would be very happy about some input about if I grasped Flink's behavior correctly! :-) Thanks in advance!
Nica

[1] https://github.com/apache/flink/pull/988
[2] https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/streaming_guide.html#partitioning

Am 12.08.2015 um 11:50 schrieb Ufuk Celebi:
Thanks :) Regarding your answer to Nica: I didn't mean to say that it was
too generic or anything... it was very nice. I was just curious, that's why
I asked.

On Wed, Aug 12, 2015 at 11:45 AM, Márton Balassi [hidden email]
wrote:

Hey Ufuk,

The shipping strategy name forward is shared between batch and streaming
and Nica did not specify either API, so I tried to give a generic answer.

I assume that your question is specifically for streaming, in that case:
Yes, streaming is using the pointwise distribution pattern. [1]
Unfortunately your concern is true, currently streaming would leave extra
downstream operator instances idle, but Aljoscha has an open pull request
fixing this issue amongst others. See the discussion here. [2]

[1]
https://github.com/apache/flink/blob/master/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L320
[2] https://github.com/apache/flink/pull/988

Cheers,

Marton

On Wed, Aug 12, 2015 at 11:33 AM, Ufuk Celebi [hidden email]
wrote:

Hey Marton,

out of curiosity: is this using Flink’s “point” connections underneath or
is there some custom logic for streaming jobs?

What happens if operator B has 2 times the parallelism of operator A? For
example if there were parallel tasks A1 and A2 and B1-B4: would A1 send to
B1 *and* B2 or just B1?

– Ufuk

On 12 Aug 2015, at 10:39, Márton Balassi [hidden email]
wrote:

Dear Nica,

Yes, forward partitioning means that if subsequent operators share
parallelism then the output of an upstream operator is sent to exactly
one downstream operator. This makes sense for operators working on
individual records, e.g. a typical map-filter pair, because as a
consequence Flink may be able to collocate these operator pairs on the same
physical machine.

Best,

Marton

On Tue, Aug 11, 2015 at 11:41 PM, Nicaz <[hidden email]
wrote:
Hello,

I have a question about forward partitioning in Flink.

If Operator A and Operator B have the same parallelism set and forward
partitioning is used for events coming from instances of A and going to
instances of B:

Will each instance of A send events to _exactly one_ instance of B?

That is, will all events coming from a specific instance of A go to the
_same_ specific instance of B, and will _all_ instances of B be used?
Or are there any situations where an instance of A will distribute events
to
several different instances of B, or where two instances of A will send
events to the same instance of B (possibly leaving some other instance of
B
unused)?

I'd be very happy if someone were able to shed some light on this issue.
:-)

Thanks in advance
Nica



--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Forward-Partitioning-same-Parallelism-1-1-communication-tp2373.html
Sent from the Apache Flink User Mailing List archive. mailing list
archive at Nabble.com.




      

    

Reply | Threaded
Open this post in threaded view
|

Re: Forward Partitioning & same Parallelism: 1:1 communication?

Aljoscha Krettek
Hi,
I looked into it. Right now, when the specified partitioner is "FORWARD" the JobGraph that is generated from the StreamGraph will have the POINT-TO-POINT pattern specified. This doesn't work, however, if the parallelism differs so the operators will not have a POINT-TO-POINT connection in the end. This results in the "REBALANCE" behavior your observed.

My PR makes it explicit which connection pattern will be used. It will also properly set the connection to "REBALANCE" if not partitioning is specified and the parallelism of the operators is different.

I hope this helps somehow, let us know if you have any other questions. 

Cheers,
Aljoscha

On Wed, 12 Aug 2015 at 11:49 Anneke Walter <[hidden email]> wrote:
Hey Márton, hey Ufuk,

thank you for your replies, that was very helpful!

I now have an additional question based on Márton's answer to Ufuk's question (by the way, I'm currently working only with the streaming API, so I am most interested in answers concerning streaming than batch processing... :-) )

In the second link Márton provided [1] it says:

"This was not very transparent: When you went from low parallelism to high dop some downstream operators would never get any input."

A question below the pull request then asks "If a non parallel source is used does the user need to call rebalance to use all parallel instances of the downstream operator?" and I don't think that question was explicitly answered. The closest thing to an explicit answer is "[so far] forward was assumed. This was valid for a change of parallelism, which led to either the degenerative case of only one downstream instance receiving elements (1 to n parallelism)".

To me, that sounds as if up until right now, in a situation where operator A has lower parallelism than the following downstream operator B (for example, source A with parallelism 1 and filter B with parallelism 4), not all instances of B would receive output from A if forward partitioning is used.

Now, in the docs [2] it says:
"Forward (default)
: Forward partitioning directs the output data to the next operator on the same machine (if possible) avoiding expensive network I/O. _If there are more processing nodes than inputs or vice versa the load is distributed among the extra nodes in a round-robin fashion_. This is the default partitioner."

So far, I would've thought that the middle sentence describes that when forward partitioning is used when the parallelism differs, outputs will be forwarded to the next operator on the same machine where possible, but also distributing some outputs to the extra nodes with round-robin. However, I've tested the setup describes above (see below) and it seems that Flink uses "normal" round-robin partitioning (rebalance partitioning) when the parallelism differs - using round-robin for _all_ outputs, not doing any "forwarding" (in the forward partitioning sense). Is that correct?

My little test: 1 Sink, 4 Filters
I tried that with Flink 0.9 and, even though I did not explicitly specify any partitioning (so the default, forward, should have been used), Flink apparently uses rebalance partitioning in this case - from the log:
DEBUG StreamingJobGraphGenerator:235 Thread-1 - Parallelism set: 4 for 2
DEBUG StreamingJobGraphGenerator:235 Thread-1 - Parallelism set: 1 for 1
DEBUG StreamingJobGraphGenerator:312 Thread-1 - CONNECTED: RebalancePartitioner - 1 -> 2

Same thing happened reversely when I went from 4 filters (B) to 1 sink C, rebalancing was apparently used.

So that one problem (concerning downstream operators not receiving outputs when forward partitioning is used) described in the pull request is apparently already fixed in 0.9 - or does it only work correctly for the source/sink connection and not between other operators (I did not have time to try more scenarios)?

Again, I would be very happy about some input about if I grasped Flink's behavior correctly! :-) Thanks in advance!
Nica

[1] https://github.com/apache/flink/pull/988
[2] https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/streaming_guide.html#partitioning

Am 12.08.2015 um 11:50 schrieb Ufuk Celebi:
Thanks :) Regarding your answer to Nica: I didn't mean to say that it was
too generic or anything... it was very nice. I was just curious, that's why
I asked.

On Wed, Aug 12, 2015 at 11:45 AM, Márton Balassi [hidden email]
wrote:

Hey Ufuk,

The shipping strategy name forward is shared between batch and streaming
and Nica did not specify either API, so I tried to give a generic answer.

I assume that your question is specifically for streaming, in that case:
Yes, streaming is using the pointwise distribution pattern. [1]
Unfortunately your concern is true, currently streaming would leave extra
downstream operator instances idle, but Aljoscha has an open pull request
fixing this issue amongst others. See the discussion here. [2]

[1]
https://github.com/apache/flink/blob/master/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L320
[2] https://github.com/apache/flink/pull/988

Cheers,

Marton

On Wed, Aug 12, 2015 at 11:33 AM, Ufuk Celebi [hidden email]
wrote:

Hey Marton,

out of curiosity: is this using Flink’s “point” connections underneath or
is there some custom logic for streaming jobs?

What happens if operator B has 2 times the parallelism of operator A? For
example if there were parallel tasks A1 and A2 and B1-B4: would A1 send to
B1 *and* B2 or just B1?

– Ufuk

On 12 Aug 2015, at 10:39, Márton Balassi [hidden email]
wrote:

Dear Nica,

Yes, forward partitioning means that if subsequent operators share
parallelism then the output of an upstream operator is sent to exactly
one downstream operator. This makes sense for operators working on
individual records, e.g. a typical map-filter pair, because as a
consequence Flink may be able to collocate these operator pairs on the same
physical machine.

Best,

Marton

On Tue, Aug 11, 2015 at 11:41 PM, Nicaz <[hidden email]
wrote:
Hello,

I have a question about forward partitioning in Flink.

If Operator A and Operator B have the same parallelism set and forward
partitioning is used for events coming from instances of A and going to
instances of B:

Will each instance of A send events to _exactly one_ instance of B?

That is, will all events coming from a specific instance of A go to the
_same_ specific instance of B, and will _all_ instances of B be used?
Or are there any situations where an instance of A will distribute events
to
several different instances of B, or where two instances of A will send
events to the same instance of B (possibly leaving some other instance of
B
unused)?

I'd be very happy if someone were able to shed some light on this issue.
:-)

Thanks in advance
Nica



--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Forward-Partitioning-same-Parallelism-1-1-communication-tp2373.html
Sent from the Apache Flink User Mailing List archive. mailing list
archive at Nabble.com.