Hey Márton, hey Ufuk,
thank you for your replies, that was very helpful!
I now have an additional question based on Márton's answer to Ufuk's
question (by the way, I'm currently working only with the streaming
API, so I am most interested in answers concerning streaming than
batch processing... :-) )
In the second link Márton provided [1] it says:
"This was not very
transparent: When you went from low parallelism to high dop some
downstream operators would never get any input."
A question below the pull request then asks "If a non parallel
source is used does the user need to call
rebalance
to
use all parallel instances of the downstream operator?" and I don't
think that question was explicitly answered. The closest thing to an
explicit answer is "[so far] forward was assumed. This was valid for
a change of parallelism, which led to either the degenerative case
of only one downstream instance receiving elements (1 to n
parallelism)".
To me, that sounds as if up until right now, in a situation where
operator A has lower parallelism than the following downstream
operator B (for example, source A with parallelism 1 and filter B
with parallelism 4), not all instances of B would receive output
from A if forward partitioning is used.
Now, in the docs [2] it says:
"Forward (default): Forward partitioning directs the output
data to the next operator on the same machine (if possible) avoiding
expensive network I/O. _If there are more processing nodes than
inputs or vice versa the load is distributed among the extra nodes
in a round-robin fashion_. This is the default partitioner."
So far, I would've thought that the middle sentence describes that
when forward partitioning is used when the parallelism differs,
outputs will be forwarded to the next operator on the same machine
where possible, but also distributing some outputs to the extra
nodes with round-robin. However, I've tested the setup describes
above (see below) and it seems that Flink uses "normal" round-robin
partitioning (rebalance partitioning) when the parallelism differs -
using round-robin for _all_ outputs, not doing any "forwarding" (in
the forward partitioning sense). Is that correct?
My little test: 1 Sink, 4 Filters
I tried that with Flink 0.9 and, even though I did not explicitly
specify any partitioning (so the default, forward, should have been
used), Flink apparently uses rebalance partitioning in this case -
from the log:
DEBUG StreamingJobGraphGenerator:235 Thread-1 - Parallelism set: 4
for 2
DEBUG StreamingJobGraphGenerator:235 Thread-1 - Parallelism set: 1
for 1
DEBUG StreamingJobGraphGenerator:312 Thread-1 - CONNECTED:
RebalancePartitioner - 1 -> 2
Same thing happened reversely when I went from 4 filters (B) to 1
sink C, rebalancing was apparently used.
So that one problem (concerning downstream operators not receiving
outputs when forward partitioning is used) described in the pull
request is apparently already fixed in 0.9 - or does it only work
correctly for the source/sink connection and not between other
operators (I did not have time to try more scenarios)?
Again, I would be very happy about some input about if I grasped
Flink's behavior correctly! :-) Thanks in advance!
Nica
[1] https://github.com/apache/flink/pull/988
[2] https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/streaming_guide.html#partitioning