Execution graph

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Execution graph

Michele Bertoni
Hi, I was trying to run my program in the flink web environment (the local one)
when I run it I get the graph of the planned execution but in each node there is a "parallelism = 1”, instead i think it runs with par = 8 (8 core, i  always get 8 output)

what does that mean?
is that wrong or is it really running with 1 degree of par?

just a note: I never do any setParallelism() command, i leave it automatical

thanks
Best
Michele
Reply | Threaded
Open this post in threaded view
|

Re: Execution graph

Maximilian Michels
Hi Michele,

If you don't set the parallelism, the default parallelism is used. For the visualization in the web client, a parallelism of one is used. When you run your example from your IDE, the default parallelism is set to the number of (virtual) cores of your CPU.

Moreover, Flink will currently not automatically set the parallelism in a cluster environment. It will use the default parallelism or the user-set parallelism. In your example, if you set the parallelism explicitly then it will also show up in the visualization.

Best,
Max

On Tue, Jun 30, 2015 at 7:11 AM, Michele Bertoni <[hidden email]> wrote:
Hi, I was trying to run my program in the flink web environment (the local one)
when I run it I get the graph of the planned execution but in each node there is a "parallelism = 1”, instead i think it runs with par = 8 (8 core, i  always get 8 output)

what does that mean?
is that wrong or is it really running with 1 degree of par?

just a note: I never do any setParallelism() command, i leave it automatical

thanks
Best
Michele

Reply | Threaded
Open this post in threaded view
|

Re: Execution graph

Ufuk Celebi
The web client currently does not support to configure the parallelism. There is an issue for it. So it will soon be fixed.

---

What you can do right now:

1) Either configure the following key in flink-conf.yaml

parallelism.default: PARALLELISM

2) Or set it via the environment:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);

You could parse the parallelism via args.

---

If you use the Flink CLI via bin/flink you don't have to do any of this, but can do the following:

bin/flink run -p PARALLELISM xyz.jar

This will set the default parallelism for the job for you. This feature is coming up for the web client.

– Ufuk

Reply | Threaded
Open this post in threaded view
|

Re: Execution graph

Fabian Hueske-2
In reply to this post by Maximilian Michels
As an addition, some operators can only be run with a parallelism of 1. For example data sources based on collections and (un-grouped) all reduces. In some cases, the parallelism of the following operators will as well be set to 1 to avoid a network shuffle.

If you do:

env.fromCollection(myCollection).map(new MyMapper()).groupBy(0).reduce(new MyReduce()).writeToFile();

the data source and mapper will be run with a parallelism of 1, the reducer and sink will be executed with the default parallelism.

Best, Fabian

2015-06-30 10:25 GMT+02:00 Maximilian Michels <[hidden email]>:
Hi Michele,

If you don't set the parallelism, the default parallelism is used. For the visualization in the web client, a parallelism of one is used. When you run your example from your IDE, the default parallelism is set to the number of (virtual) cores of your CPU.

Moreover, Flink will currently not automatically set the parallelism in a cluster environment. It will use the default parallelism or the user-set parallelism. In your example, if you set the parallelism explicitly then it will also show up in the visualization.

Best,
Max

On Tue, Jun 30, 2015 at 7:11 AM, Michele Bertoni <[hidden email]> wrote:
Hi, I was trying to run my program in the flink web environment (the local one)
when I run it I get the graph of the planned execution but in each node there is a "parallelism = 1”, instead i think it runs with par = 8 (8 core, i  always get 8 output)

what does that mean?
is that wrong or is it really running with 1 degree of par?

just a note: I never do any setParallelism() command, i leave it automatical

thanks
Best
Michele


Reply | Threaded
Open this post in threaded view
|

Re: Execution graph

Michele Bertoni

Hi everybody and thanks for the answer


So if I understood you said that
apart from some operation, most of them are executed at the default parallelism value (that is what I expected)
but the viewer will always show 1 if something different is not set via setParallelism

is it right?

I don’t have particular need, the higher is the parallelism the better
I am able to bin my data in more groups than the number of workers in the cluster, is it better to explicitly write the degree of parallelism or can I leave it blank (so = to default)?


thanks
Michele


Il giorno 30/giu/2015, alle ore 10:41, Fabian Hueske <[hidden email]> ha scritto:

As an addition, some operators can only be run with a parallelism of 1. For example data sources based on collections and (un-grouped) all reduces. In some cases, the parallelism of the following operators will as well be set to 1 to avoid a network shuffle.

If you do:

env.fromCollection(myCollection).map(new MyMapper()).groupBy(0).reduce(new MyReduce()).writeToFile();

the data source and mapper will be run with a parallelism of 1, the reducer and sink will be executed with the default parallelism.

Best, Fabian

2015-06-30 10:25 GMT+02:00 Maximilian Michels <[hidden email]>:
Hi Michele,

If you don't set the parallelism, the default parallelism is used. For the visualization in the web client, a parallelism of one is used. When you run your example from your IDE, the default parallelism is set to the number of (virtual) cores of your CPU.

Moreover, Flink will currently not automatically set the parallelism in a cluster environment. It will use the default parallelism or the user-set parallelism. In your example, if you set the parallelism explicitly then it will also show up in the visualization.

Best,
Max

On Tue, Jun 30, 2015 at 7:11 AM, Michele Bertoni <[hidden email]> wrote:
Hi, I was trying to run my program in the flink web environment (the local one)
when I run it I get the graph of the planned execution but in each node there is a "parallelism = 1”, instead i think it runs with par = 8 (8 core, i  always get 8 output)

what does that mean?
is that wrong or is it really running with 1 degree of par?

just a note: I never do any setParallelism() command, i leave it automatical

thanks
Best
Michele



Reply | Threaded
Open this post in threaded view
|

Re: Execution graph

Maximilian Michels
Yes, the web client always shows parallelism 1. That is a bug but it does not affect the execution of your program.

If you specify the default parallelism in your Flink config, you don't have to set it in your program or via the command line argument (-p). However, if you leave it at its default and do not set it anywhere, then it will be 1. Like you already pointed out, that won't execute your programs distributed.

The parallelism is set in this order:

1) parallelism default set in config
2) parallelism default set through the command-line client
3) parallelism set directly in your program on the ExecutionEnvironment using setParallelism
4) parallelism set on the operator using setParallism(...)

Each stage overrides the preceding. So 3 will override the settings of 1-2 and 4 will override the parallelism for a particular operator previously set by 1-3.

Best,
Max

On Tue, Jun 30, 2015 at 4:48 PM, Michele Bertoni <[hidden email]> wrote:

Hi everybody and thanks for the answer


So if I understood you said that
apart from some operation, most of them are executed at the default parallelism value (that is what I expected)
but the viewer will always show 1 if something different is not set via setParallelism

is it right?

I don’t have particular need, the higher is the parallelism the better
I am able to bin my data in more groups than the number of workers in the cluster, is it better to explicitly write the degree of parallelism or can I leave it blank (so = to default)?


thanks
Michele


Il giorno 30/giu/2015, alle ore 10:41, Fabian Hueske <[hidden email]> ha scritto:

As an addition, some operators can only be run with a parallelism of 1. For example data sources based on collections and (un-grouped) all reduces. In some cases, the parallelism of the following operators will as well be set to 1 to avoid a network shuffle.

If you do:

env.fromCollection(myCollection).map(new MyMapper()).groupBy(0).reduce(new MyReduce()).writeToFile();

the data source and mapper will be run with a parallelism of 1, the reducer and sink will be executed with the default parallelism.

Best, Fabian

2015-06-30 10:25 GMT+02:00 Maximilian Michels <[hidden email]>:
Hi Michele,

If you don't set the parallelism, the default parallelism is used. For the visualization in the web client, a parallelism of one is used. When you run your example from your IDE, the default parallelism is set to the number of (virtual) cores of your CPU.

Moreover, Flink will currently not automatically set the parallelism in a cluster environment. It will use the default parallelism or the user-set parallelism. In your example, if you set the parallelism explicitly then it will also show up in the visualization.

Best,
Max

On Tue, Jun 30, 2015 at 7:11 AM, Michele Bertoni <[hidden email]> wrote:
Hi, I was trying to run my program in the flink web environment (the local one)
when I run it I get the graph of the planned execution but in each node there is a "parallelism = 1”, instead i think it runs with par = 8 (8 core, i  always get 8 output)

what does that mean?
is that wrong or is it really running with 1 degree of par?

just a note: I never do any setParallelism() command, i leave it automatical

thanks
Best
Michele