I went through the explanation on MaxParallelism in the official docs here:
However, I am not able to figure out how Flink decides the parallelism value. For instance, if I setMaxParallelism to 3, I see that for my job, there is only 1 subtask that is created. How did Flink decide that 1 subtask was enough? Regards, James |
What was the input format, the size and the program that you tried to execute
|
I have a sample application that reads around 2 GB of csv files, converts each record into Avro object and sends it to kafka. I use a custom FileReader that reads the files in a directory. I have set taskmanager.numberOfTaskSlots to 4. I see that if I use setParallelism(3), 3 subtasks are created. But if I use setMaxParallelism(3), only 1 subtask is created. On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <[hidden email]> wrote:
|
Hi James,
the number of subtasks being used is defined by the parallelism, the max parallelism, however, "... determines the maximum parallelism to which you can scale operators" [1]. That is, once set, you cannot ever (even after restarting your program from a savepoint) increase the operator's parallelism above this value. The actual parallelism can be set per job in your program but also in the flink client: flink run -p <parallelism> <jar-file> <arguments> Nico [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly On 28/03/18 09:25, Data Engineer wrote: > I have a sample application that reads around 2 GB of csv files, > converts each record into Avro object and sends it to kafka. > I use a custom FileReader that reads the files in a directory. > I have set taskmanager.numberOfTaskSlots to 4. > I see that if I use setParallelism(3), 3 subtasks are created. But if I > use setMaxParallelism(3), only 1 subtask is created. > > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <[hidden email] > <mailto:[hidden email]>> wrote: > > What was the input format, the size and the program that you tried > to execute > > On 28. Mar 2018, at 08:18, Data Engineer <[hidden email] > <mailto:[hidden email]>> wrote: > >> I went through the explanation on MaxParallelism in the official >> docs here: >> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly >> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly> >> >> However, I am not able to figure out how Flink decides the >> parallelism value. >> For instance, if I setMaxParallelism to 3, I see that for my job, >> there is only 1 subtask that is created. How did Flink decide that >> 1 subtask was enough? >> >> Regards, >> James > > Nico Kruber | Software Engineer data Artisans Follow us @dataArtisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen signature.asc (201 bytes) Download Attachment |
Agreed. But how did Flink decide that it should allot 1 subtask? Why not 2 or 3? I am trying to understand the implications of using setMaxParallelism vs setParallelism On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber <[hidden email]> wrote: Hi James, |
Flink does not decide the parallelism based on your job.
There is a default parallelism (configured via parallelism.default [1], by default 1) which is used if you do not specify it yourself. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options On 28/03/18 13:21, Data Engineer wrote: > Agreed. But how did Flink decide that it should allot 1 subtask? Why not > 2 or 3? > I am trying to understand the implications of using setMaxParallelism vs > setParallelism > > On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi James, > the number of subtasks being used is defined by the parallelism, the max > parallelism, however, "... determines the maximum parallelism to which > you can scale operators" [1]. That is, once set, you cannot ever (even > after restarting your program from a savepoint) increase the operator's > parallelism above this value. The actual parallelism can be set per job > in your program but also in the flink client: > flink run -p <parallelism> <jar-file> <arguments> > > > Nico > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly > <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly> > > On 28/03/18 09:25, Data Engineer wrote: > > I have a sample application that reads around 2 GB of csv files, > > converts each record into Avro object and sends it to kafka. > > I use a custom FileReader that reads the files in a directory. > > I have set taskmanager.numberOfTaskSlots to 4. > > I see that if I use setParallelism(3), 3 subtasks are created. But if I > > use setMaxParallelism(3), only 1 subtask is created. > > > > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <[hidden email] <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > > What was the input format, the size and the program that you tried > > to execute > > > > On 28. Mar 2018, at 08:18, Data Engineer <[hidden email] <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > >> I went through the explanation on MaxParallelism in the official > >> docs here: > >> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly > <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly> > >> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly > <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>> > >> > >> However, I am not able to figure out how Flink decides the > >> parallelism value. > >> For instance, if I setMaxParallelism to 3, I see that for my job, > >> there is only 1 subtask that is created. How did Flink decide that > >> 1 subtask was enough? > >> > >> Regards, > >> James > > > > > > -- > Nico Kruber | Software Engineer > data Artisans > > Follow us @dataArtisans > -- > Join Flink Forward - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany > data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > > Nico Kruber | Software Engineer data Artisans Follow us @dataArtisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen signature.asc (201 bytes) Download Attachment |
Is there an auto-scaling feature in Flink, where I start with parallelism of (for example) 1, but Flink notices I have high volume of data to process, and automatically increases parallelism of a running job?
Thanks, Alex -----Original Message----- From: Nico Kruber [mailto:[hidden email]] Sent: Wednesday, March 28, 2018 8:54 AM To: Data Engineer <[hidden email]> Cc: Jörn Franke <[hidden email]>; [hidden email] Subject: Re: How does setMaxParallelism work Flink does not decide the parallelism based on your job. There is a default parallelism (configured via parallelism.default [1], by default 1) which is used if you do not specify it yourself. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options On 28/03/18 13:21, Data Engineer wrote: > Agreed. But how did Flink decide that it should allot 1 subtask? Why > not > 2 or 3? > I am trying to understand the implications of using setMaxParallelism > vs setParallelism > > On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi James, > the number of subtasks being used is defined by the parallelism, the max > parallelism, however, "... determines the maximum parallelism to which > you can scale operators" [1]. That is, once set, you cannot ever (even > after restarting your program from a savepoint) increase the operator's > parallelism above this value. The actual parallelism can be set per job > in your program but also in the flink client: > flink run -p <parallelism> <jar-file> <arguments> > > > Nico > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly > > <https://ci.apache.org/projects/flink/flink-docs-master/ops/production > _ready.html#set-maximum-parallelism-for-operators-explicitly> > > On 28/03/18 09:25, Data Engineer wrote: > > I have a sample application that reads around 2 GB of csv files, > > converts each record into Avro object and sends it to kafka. > > I use a custom FileReader that reads the files in a directory. > > I have set taskmanager.numberOfTaskSlots to 4. > > I see that if I use setParallelism(3), 3 subtasks are created. But if I > > use setMaxParallelism(3), only 1 subtask is created. > > > > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <[hidden email] <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > > What was the input format, the size and the program that you tried > > to execute > > > > On 28. Mar 2018, at 08:18, Data Engineer <[hidden email] <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > >> I went through the explanation on MaxParallelism in the official > >> docs here: > >> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly > <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly> > >> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly > <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>> > >> > >> However, I am not able to figure out how Flink decides the > >> parallelism value. > >> For instance, if I setMaxParallelism to 3, I see that for my job, > >> there is only 1 subtask that is created. How did Flink decide that > >> 1 subtask was enough? > >> > >> Regards, > >> James > > > > > > -- > Nico Kruber | Software Engineer > data Artisans > > Follow us @dataArtisans > -- > Join Flink Forward - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany > data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > > -- Nico Kruber | Software Engineer data Artisans Follow us @dataArtisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
No, currently, this it is up to you to decide whether you need to scale
and how. If, for a running Flink job, you decide to scale, you - flink cancel --withSavepoint <targetDirectory> <Job ID> - flink run -p <newParallelism> --fromSavepoint <savepointPath> <jar-file> <arguments> Nico On 29/03/18 19:27, NEKRASSOV, ALEXEI wrote: > Is there an auto-scaling feature in Flink, where I start with parallelism of (for example) 1, but Flink notices I have high volume of data to process, and automatically increases parallelism of a running job? > > Thanks, > Alex > > -----Original Message----- > From: Nico Kruber [mailto:[hidden email]] > Sent: Wednesday, March 28, 2018 8:54 AM > To: Data Engineer <[hidden email]> > Cc: Jörn Franke <[hidden email]>; [hidden email] > Subject: Re: How does setMaxParallelism work > > Flink does not decide the parallelism based on your job. > There is a default parallelism (configured via parallelism.default [1], by default 1) which is used if you do not specify it yourself. > > > Nico > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options > > On 28/03/18 13:21, Data Engineer wrote: >> Agreed. But how did Flink decide that it should allot 1 subtask? Why >> not >> 2 or 3? >> I am trying to understand the implications of using setMaxParallelism >> vs setParallelism >> >> On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber <[hidden email] >> <mailto:[hidden email]>> wrote: >> >> Hi James, >> the number of subtasks being used is defined by the parallelism, the max >> parallelism, however, "... determines the maximum parallelism to which >> you can scale operators" [1]. That is, once set, you cannot ever (even >> after restarting your program from a savepoint) increase the operator's >> parallelism above this value. The actual parallelism can be set per job >> in your program but also in the flink client: >> flink run -p <parallelism> <jar-file> <arguments> >> >> >> Nico >> >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly >> >> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production >> _ready.html#set-maximum-parallelism-for-operators-explicitly> >> >> On 28/03/18 09:25, Data Engineer wrote: >> > I have a sample application that reads around 2 GB of csv files, >> > converts each record into Avro object and sends it to kafka. >> > I use a custom FileReader that reads the files in a directory. >> > I have set taskmanager.numberOfTaskSlots to 4. >> > I see that if I use setParallelism(3), 3 subtasks are created. But if I >> > use setMaxParallelism(3), only 1 subtask is created. >> > >> > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <[hidden email] <mailto:[hidden email]> >> > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: >> > >> > What was the input format, the size and the program that you tried >> > to execute >> > >> > On 28. Mar 2018, at 08:18, Data Engineer <[hidden email] <mailto:[hidden email]> >> > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: >> > >> >> I went through the explanation on MaxParallelism in the official >> >> docs here: >> >> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly >> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly> >> >> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly >> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>> >> >> >> >> However, I am not able to figure out how Flink decides the >> >> parallelism value. >> >> For instance, if I setMaxParallelism to 3, I see that for my job, >> >> there is only 1 subtask that is created. How did Flink decide that >> >> 1 subtask was enough? >> >> >> >> Regards, >> >> James >> > >> > >> >> -- >> Nico Kruber | Software Engineer >> data Artisans >> >> Follow us @dataArtisans >> -- >> Join Flink Forward - The Apache Flink Conference >> Stream Processing | Event Driven | Real Time >> -- >> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany >> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA >> -- >> Data Artisans GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >> >> > > -- > Nico Kruber | Software Engineer > data Artisans > > Follow us @dataArtisans > -- > Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > Nico Kruber | Software Engineer data Artisans Follow us @dataArtisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen signature.asc (201 bytes) Download Attachment |
Free forum by Nabble | Edit this page |