Doubts about parallelism

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Doubts about parallelism

AndreaKinn
Hi,
I read the doc about parallelism, parallel execution and job scheduling but
however I have some doubts about parallelism.

1.
In my first try I unset parallelism in my code and commented
parallelism.default key in link-conf file. In this case I supposed the
parallelism was set by Flink automatically on operator basis. Is this
consideration correct?

2.
In a second try I unset parallelism in my code but I set
parallelism.default: 2 in flink-conf file.
In my code I have some source, some sink and two custom function from an
external library supported by Flink. These one don’t have setParallelism()
method so I can’t set a specific parallelism for them.
Anyway when I tried to execute it I obtain the following error:

/java.lang.UnsupportedOperationException: Forward partitioning does not
allow change of parallelism. Upstream operation: Learn-11 parallelism: 1,
downstream operation: Select-13 parallelism: 3 You must use another
partitioning strategy, such as broadcast, rebalance, shuffle or global./

This lead me to the second question. Am I constrained to set
parallelism.default: 1 to respect parallelism of “learn” method? In this way
I need to set parallelism to each operator in Flink (for example 2) and
leave “select” parallelism to the default value (1) since I can’t set a
specific parallelism on it (I can’t set 3 as suggested in the error).

Moreover, I searched a lot on relations between partitioning and parallelism
on doc but everything I read seems a bit unclear for me. Can you explain it
better?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Doubts about parallelism

Tony Wei
Hi Andrea,

For your first question, I think you are right, but the basis is set by the default value for `parallelism.default` in flink-conf.yaml. [1]

For your second question, I guess you use `forward` function between "learn" and "select" methods. Am I right?
That exception is an expected behavior, because `forward` function will forward elements to the local subtask of the next operation, but it needs the upstream and the downstream have the same parallelism. [2]
Therefore, the exception advised you to change your partitioning strategy, such as broadcast, rebalance, shuffle, or global, so that the "learn" and "select" can be applied different parallelism setting.

Hope this will help you.

Best Regards,
Tony Wei


2017-10-15 2:56 GMT+08:00 AndreaKinn <[hidden email]>:
Hi,
I read the doc about parallelism, parallel execution and job scheduling but
however I have some doubts about parallelism.

1.
In my first try I unset parallelism in my code and commented
parallelism.default key in link-conf file. In this case I supposed the
parallelism was set by Flink automatically on operator basis. Is this
consideration correct?

2.
In a second try I unset parallelism in my code but I set
parallelism.default: 2 in flink-conf file.
In my code I have some source, some sink and two custom function from an
external library supported by Flink. These one don’t have setParallelism()
method so I can’t set a specific parallelism for them.
Anyway when I tried to execute it I obtain the following error:

/java.lang.UnsupportedOperationException: Forward partitioning does not
allow change of parallelism. Upstream operation: Learn-11 parallelism: 1,
downstream operation: Select-13 parallelism: 3 You must use another
partitioning strategy, such as broadcast, rebalance, shuffle or global./

This lead me to the second question. Am I constrained to set
parallelism.default: 1 to respect parallelism of “learn” method? In this way
I need to set parallelism to each operator in Flink (for example 2) and
leave “select” parallelism to the default value (1) since I can’t set a
specific parallelism on it (I can’t set 3 as suggested in the error).

Moreover, I searched a lot on relations between partitioning and parallelism
on doc but everything I read seems a bit unclear for me. Can you explain it
better?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/