How does setMaxParallelism work

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

How does setMaxParallelism work

James Isaac
I went through the explanation on MaxParallelism in the official docs here:

However, I am not able to figure out how Flink decides the parallelism value.
For instance, if I setMaxParallelism to 3, I see that for my job, there is only 1 subtask that is created. How did Flink decide that 1 subtask was enough?

Regards,
James
Reply | Threaded
Open this post in threaded view
|

Re: How does setMaxParallelism work

Jörn Franke
What was the input format, the size and the program that you tried to execute

On 28. Mar 2018, at 08:18, Data Engineer <[hidden email]> wrote:

I went through the explanation on MaxParallelism in the official docs here:

However, I am not able to figure out how Flink decides the parallelism value.
For instance, if I setMaxParallelism to 3, I see that for my job, there is only 1 subtask that is created. How did Flink decide that 1 subtask was enough?

Regards,
James
Reply | Threaded
Open this post in threaded view
|

Re: How does setMaxParallelism work

James Isaac
I have a sample application that reads around 2 GB of csv files, converts each record into Avro object and sends it to kafka.
I use a custom FileReader that reads the files in a directory.
I have set taskmanager.numberOfTaskSlots to 4.
I see that if I use setParallelism(3), 3 subtasks are created. But if I use setMaxParallelism(3), only 1 subtask is created.

On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <[hidden email]> wrote:
What was the input format, the size and the program that you tried to execute

On 28. Mar 2018, at 08:18, Data Engineer <[hidden email]> wrote:

I went through the explanation on MaxParallelism in the official docs here:

However, I am not able to figure out how Flink decides the parallelism value.
For instance, if I setMaxParallelism to 3, I see that for my job, there is only 1 subtask that is created. How did Flink decide that 1 subtask was enough?

Regards,
James

Reply | Threaded
Open this post in threaded view
|

Re: How does setMaxParallelism work

Nico Kruber
Hi James,
the number of subtasks being used is defined by the parallelism, the max
parallelism, however, "... determines the maximum parallelism to which
you can scale operators" [1]. That is, once set, you cannot ever (even
after restarting your program from a savepoint) increase the operator's
parallelism above this value. The actual parallelism can be set per job
in your program but also in the flink client:
flink run -p <parallelism> <jar-file> <arguments>


Nico



[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly

On 28/03/18 09:25, Data Engineer wrote:

> I have a sample application that reads around 2 GB of csv files,
> converts each record into Avro object and sends it to kafka.
> I use a custom FileReader that reads the files in a directory.
> I have set taskmanager.numberOfTaskSlots to 4.
> I see that if I use setParallelism(3), 3 subtasks are created. But if I
> use setMaxParallelism(3), only 1 subtask is created.
>
> On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     What was the input format, the size and the program that you tried
>     to execute
>
>     On 28. Mar 2018, at 08:18, Data Engineer <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>>     I went through the explanation on MaxParallelism in the official
>>     docs here:
>>     https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>>     <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
>>
>>     However, I am not able to figure out how Flink decides the
>>     parallelism value.
>>     For instance, if I setMaxParallelism to 3, I see that for my job,
>>     there is only 1 subtask that is created. How did Flink decide that
>>     1 subtask was enough?
>>
>>     Regards,
>>     James
>
>
--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How does setMaxParallelism work

James Isaac
Agreed. But how did Flink decide that it should allot 1 subtask? Why not 2 or 3?
I am trying to understand the implications of using setMaxParallelism vs setParallelism

On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber <[hidden email]> wrote:
Hi James,
the number of subtasks being used is defined by the parallelism, the max
parallelism, however, "... determines the maximum parallelism to which
you can scale operators" [1]. That is, once set, you cannot ever (even
after restarting your program from a savepoint) increase the operator's
parallelism above this value. The actual parallelism can be set per job
in your program but also in the flink client:
flink run -p <parallelism> <jar-file> <arguments>


Nico



[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly

On 28/03/18 09:25, Data Engineer wrote:
> I have a sample application that reads around 2 GB of csv files,
> converts each record into Avro object and sends it to kafka.
> I use a custom FileReader that reads the files in a directory.
> I have set taskmanager.numberOfTaskSlots to 4.
> I see that if I use setParallelism(3), 3 subtasks are created. But if I
> use setMaxParallelism(3), only 1 subtask is created.
>
> On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     What was the input format, the size and the program that you tried
>     to execute
>
>     On 28. Mar 2018, at 08:18, Data Engineer <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>>     I went through the explanation on MaxParallelism in the official
>>     docs here:
>>     https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>>     <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
>>
>>     However, I am not able to figure out how Flink decides the
>>     parallelism value.
>>     For instance, if I setMaxParallelism to 3, I see that for my job,
>>     there is only 1 subtask that is created. How did Flink decide that
>>     1 subtask was enough?
>>
>>     Regards,
>>     James
>
>

--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Reply | Threaded
Open this post in threaded view
|

Re: How does setMaxParallelism work

Nico Kruber
Flink does not decide the parallelism based on your job.
There is a default parallelism (configured via parallelism.default [1],
by default 1) which is used if you do not specify it yourself.


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options

On 28/03/18 13:21, Data Engineer wrote:

> Agreed. But how did Flink decide that it should allot 1 subtask? Why not
> 2 or 3?
> I am trying to understand the implications of using setMaxParallelism vs
> setParallelism
>
> On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi James,
>     the number of subtasks being used is defined by the parallelism, the max
>     parallelism, however, "... determines the maximum parallelism to which
>     you can scale operators" [1]. That is, once set, you cannot ever (even
>     after restarting your program from a savepoint) increase the operator's
>     parallelism above this value. The actual parallelism can be set per job
>     in your program but also in the flink client:
>     flink run -p <parallelism> <jar-file> <arguments>
>
>
>     Nico
>
>
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>     <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
>
>     On 28/03/18 09:25, Data Engineer wrote:
>     > I have a sample application that reads around 2 GB of csv files,
>     > converts each record into Avro object and sends it to kafka.
>     > I use a custom FileReader that reads the files in a directory.
>     > I have set taskmanager.numberOfTaskSlots to 4.
>     > I see that if I use setParallelism(3), 3 subtasks are created. But if I
>     > use setMaxParallelism(3), only 1 subtask is created.
>     >
>     > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <[hidden email] <mailto:[hidden email]>
>     > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>     >
>     >     What was the input format, the size and the program that you tried
>     >     to execute
>     >
>     >     On 28. Mar 2018, at 08:18, Data Engineer <[hidden email] <mailto:[hidden email]>
>     >     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>     >
>     >>     I went through the explanation on MaxParallelism in the official
>     >>     docs here:
>     >>     https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>     <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
>     >>     <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>     <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>>
>     >>
>     >>     However, I am not able to figure out how Flink decides the
>     >>     parallelism value.
>     >>     For instance, if I setMaxParallelism to 3, I see that for my job,
>     >>     there is only 1 subtask that is created. How did Flink decide that
>     >>     1 subtask was enough?
>     >>
>     >>     Regards,
>     >>     James
>     >
>     >
>
>     --
>     Nico Kruber | Software Engineer
>     data Artisans
>
>     Follow us @dataArtisans
>     --
>     Join Flink Forward - The Apache Flink Conference
>     Stream Processing | Event Driven | Real Time
>     --
>     Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>     data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
>     --
>     Data Artisans GmbH
>     Registered at Amtsgericht Charlottenburg: HRB 158244 B
>     Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>
--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: How does setMaxParallelism work

NEKRASSOV, ALEXEI
Is there an auto-scaling feature in Flink, where I start with parallelism of (for example) 1, but Flink notices I have high volume of data to process, and automatically increases parallelism of a running job?

Thanks,
Alex

-----Original Message-----
From: Nico Kruber [mailto:[hidden email]]
Sent: Wednesday, March 28, 2018 8:54 AM
To: Data Engineer <[hidden email]>
Cc: Jörn Franke <[hidden email]>; [hidden email]
Subject: Re: How does setMaxParallelism work

Flink does not decide the parallelism based on your job.
There is a default parallelism (configured via parallelism.default [1], by default 1) which is used if you do not specify it yourself.


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options

On 28/03/18 13:21, Data Engineer wrote:

> Agreed. But how did Flink decide that it should allot 1 subtask? Why
> not
> 2 or 3?
> I am trying to understand the implications of using setMaxParallelism
> vs setParallelism
>
> On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi James,
>     the number of subtasks being used is defined by the parallelism, the max
>     parallelism, however, "... determines the maximum parallelism to which
>     you can scale operators" [1]. That is, once set, you cannot ever (even
>     after restarting your program from a savepoint) increase the operator's
>     parallelism above this value. The actual parallelism can be set per job
>     in your program but also in the flink client:
>     flink run -p <parallelism> <jar-file> <arguments>
>
>
>     Nico
>
>
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>    
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production
> _ready.html#set-maximum-parallelism-for-operators-explicitly>
>
>     On 28/03/18 09:25, Data Engineer wrote:
>     > I have a sample application that reads around 2 GB of csv files,
>     > converts each record into Avro object and sends it to kafka.
>     > I use a custom FileReader that reads the files in a directory.
>     > I have set taskmanager.numberOfTaskSlots to 4.
>     > I see that if I use setParallelism(3), 3 subtasks are created. But if I
>     > use setMaxParallelism(3), only 1 subtask is created.
>     >
>     > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <[hidden email] <mailto:[hidden email]>
>     > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>     >
>     >     What was the input format, the size and the program that you tried
>     >     to execute
>     >
>     >     On 28. Mar 2018, at 08:18, Data Engineer <[hidden email] <mailto:[hidden email]>
>     >     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>     >
>     >>     I went through the explanation on MaxParallelism in the official
>     >>     docs here:
>     >>     https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>     <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
>     >>     <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>     <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>>
>     >>
>     >>     However, I am not able to figure out how Flink decides the
>     >>     parallelism value.
>     >>     For instance, if I setMaxParallelism to 3, I see that for my job,
>     >>     there is only 1 subtask that is created. How did Flink decide that
>     >>     1 subtask was enough?
>     >>
>     >>     Regards,
>     >>     James
>     >
>     >
>
>     --
>     Nico Kruber | Software Engineer
>     data Artisans
>
>     Follow us @dataArtisans
>     --
>     Join Flink Forward - The Apache Flink Conference
>     Stream Processing | Event Driven | Real Time
>     --
>     Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>     data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
>     --
>     Data Artisans GmbH
>     Registered at Amtsgericht Charlottenburg: HRB 158244 B
>     Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>

--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: How does setMaxParallelism work

Nico Kruber
No, currently, this it is up to you to decide whether you need to scale
and how. If, for a running Flink job, you decide to scale, you
- flink cancel --withSavepoint <targetDirectory> <Job ID>
- flink run -p <newParallelism> --fromSavepoint <savepointPath>
<jar-file> <arguments>


Nico

On 29/03/18 19:27, NEKRASSOV, ALEXEI wrote:

> Is there an auto-scaling feature in Flink, where I start with parallelism of (for example) 1, but Flink notices I have high volume of data to process, and automatically increases parallelism of a running job?
>
> Thanks,
> Alex
>
> -----Original Message-----
> From: Nico Kruber [mailto:[hidden email]]
> Sent: Wednesday, March 28, 2018 8:54 AM
> To: Data Engineer <[hidden email]>
> Cc: Jörn Franke <[hidden email]>; [hidden email]
> Subject: Re: How does setMaxParallelism work
>
> Flink does not decide the parallelism based on your job.
> There is a default parallelism (configured via parallelism.default [1], by default 1) which is used if you do not specify it yourself.
>
>
> Nico
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options
>
> On 28/03/18 13:21, Data Engineer wrote:
>> Agreed. But how did Flink decide that it should allot 1 subtask? Why
>> not
>> 2 or 3?
>> I am trying to understand the implications of using setMaxParallelism
>> vs setParallelism
>>
>> On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>>     Hi James,
>>     the number of subtasks being used is defined by the parallelism, the max
>>     parallelism, however, "... determines the maximum parallelism to which
>>     you can scale operators" [1]. That is, once set, you cannot ever (even
>>     after restarting your program from a savepoint) increase the operator's
>>     parallelism above this value. The actual parallelism can be set per job
>>     in your program but also in the flink client:
>>     flink run -p <parallelism> <jar-file> <arguments>
>>
>>
>>     Nico
>>
>>
>>
>>     [1]
>>     https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>>    
>> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production
>> _ready.html#set-maximum-parallelism-for-operators-explicitly>
>>
>>     On 28/03/18 09:25, Data Engineer wrote:
>>     > I have a sample application that reads around 2 GB of csv files,
>>     > converts each record into Avro object and sends it to kafka.
>>     > I use a custom FileReader that reads the files in a directory.
>>     > I have set taskmanager.numberOfTaskSlots to 4.
>>     > I see that if I use setParallelism(3), 3 subtasks are created. But if I
>>     > use setMaxParallelism(3), only 1 subtask is created.
>>     >
>>     > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <[hidden email] <mailto:[hidden email]>
>>     > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>>     >
>>     >     What was the input format, the size and the program that you tried
>>     >     to execute
>>     >
>>     >     On 28. Mar 2018, at 08:18, Data Engineer <[hidden email] <mailto:[hidden email]>
>>     >     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>>     >
>>     >>     I went through the explanation on MaxParallelism in the official
>>     >>     docs here:
>>     >>     https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>>     <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
>>     >>     <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>>     <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>>
>>     >>
>>     >>     However, I am not able to figure out how Flink decides the
>>     >>     parallelism value.
>>     >>     For instance, if I setMaxParallelism to 3, I see that for my job,
>>     >>     there is only 1 subtask that is created. How did Flink decide that
>>     >>     1 subtask was enough?
>>     >>
>>     >>     Regards,
>>     >>     James
>>     >
>>     >
>>
>>     --
>>     Nico Kruber | Software Engineer
>>     data Artisans
>>
>>     Follow us @dataArtisans
>>     --
>>     Join Flink Forward - The Apache Flink Conference
>>     Stream Processing | Event Driven | Real Time
>>     --
>>     Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>>     data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
>>     --
>>     Data Artisans GmbH
>>     Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>     Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>>
>
> --
> Nico Kruber | Software Engineer
> data Artisans
>
> Follow us @dataArtisans
> --
> Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


signature.asc (201 bytes) Download Attachment