Flink Dashboard UI Tasks hard limit

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Dashboard UI Tasks hard limit

Vijay Balakrishnan
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,


Screen Shot 2020-05-19 at 12.15.20 PM.png (177K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Vijay Balakrishnan
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Xintong Song
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Vijay Balakrishnan
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Xintong Song
Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.
That's wired. I don't think the number of network memory buffers have anything to do with the task amount.

Let me try to clarify a few things.

Please be aware that, how many tasks a Flink job has, and how many slots a Flink cluster has, are two different things.
- The number of tasks are decided by your job's parallelism and topology. E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) tasks.
- The number of slots are decided by number of TMs and slots-per-TM.
- For streaming jobs, you have to make sure the number of slots is enough for executing all your tasks. The number of slots needed for executing your job is by default the max parallelism of your job graph vertices. Take the above example, you would need 4 slots, because it's the max among all the vertices' parallelisms (2, 3, 4).

In your case, the screenshot shows that you job has 9621 tasks in total (not around 18000, the dark box shows total tasks while the green box shows running tasks), and 600 slots are in use (658 - 58) suggesting that the max parallelism of your job graph vertices is 600.

If you want to increase the number of tasks, you should increase your job parallelism. There are several ways to do that.
  • In your job codes (assuming you are using DataStream API)
    • Use `StreamExecutionEnvironment#setParallelism()` to set parallelism for all operators.
    • Use `SingleOutputStreamOperator#setParallelism()` to set parallelism for a specific operator. (Only supported for subclasses of `SingleOutputStreamOperator`.)
  • When submitting your job, use `-p <parallelism>` as an argument for the `flink run` command, to set parallelism for all operators.
  • Set `parallelism.default` in your `flink-conf.yaml`, to set a default parallelism for your jobs. This will be used for jobs that have not set parallelism with neither of the above methods.

Thank you~

Xintong Song



On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Vijay Balakrishnan
Hi Xintong,
Thanks for the excellent clarification for tasks.

I attached a sample screenshot above and din't reflect the slots used and the tasks limit I was running into in that pic.

I am attaching my Execution plan here. Please let me know how I can increase the nmber of tasks aka parallelism. As  increase the parallelism, i run into this bottleneck with the tasks.

BTW - The https://flink.apache.org/visualizer/ is a great start to see this.
TIA,

On Sun, May 24, 2020 at 7:52 PM Xintong Song <[hidden email]> wrote:
Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.
That's wired. I don't think the number of network memory buffers have anything to do with the task amount.

Let me try to clarify a few things.

Please be aware that, how many tasks a Flink job has, and how many slots a Flink cluster has, are two different things.
- The number of tasks are decided by your job's parallelism and topology. E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) tasks.
- The number of slots are decided by number of TMs and slots-per-TM.
- For streaming jobs, you have to make sure the number of slots is enough for executing all your tasks. The number of slots needed for executing your job is by default the max parallelism of your job graph vertices. Take the above example, you would need 4 slots, because it's the max among all the vertices' parallelisms (2, 3, 4).

In your case, the screenshot shows that you job has 9621 tasks in total (not around 18000, the dark box shows total tasks while the green box shows running tasks), and 600 slots are in use (658 - 58) suggesting that the max parallelism of your job graph vertices is 600.

If you want to increase the number of tasks, you should increase your job parallelism. There are several ways to do that.
  • In your job codes (assuming you are using DataStream API)
    • Use `StreamExecutionEnvironment#setParallelism()` to set parallelism for all operators.
    • Use `SingleOutputStreamOperator#setParallelism()` to set parallelism for a specific operator. (Only supported for subclasses of `SingleOutputStreamOperator`.)
  • When submitting your job, use `-p <parallelism>` as an argument for the `flink run` command, to set parallelism for all operators.
  • Set `parallelism.default` in your `flink-conf.yaml`, to set a default parallelism for your jobs. This will be used for jobs that have not set parallelism with neither of the above methods.

Thank you~

Xintong Song



On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,


analytics-execution-plan-real.json (224K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Xintong Song
Could you also explain how do you set the parallelism when getting this execution plan?
I'm asking because this json file itself only shows the resulted execution plan. It is not clear to me what is not working as expected in your case. E.g., you set the parallelism for an operator to 10 but the execution plan only shows 5.

Thank you~

Xintong Song



On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thanks for the excellent clarification for tasks.

I attached a sample screenshot above and din't reflect the slots used and the tasks limit I was running into in that pic.

I am attaching my Execution plan here. Please let me know how I can increase the nmber of tasks aka parallelism. As  increase the parallelism, i run into this bottleneck with the tasks.

BTW - The https://flink.apache.org/visualizer/ is a great start to see this.
TIA,

On Sun, May 24, 2020 at 7:52 PM Xintong Song <[hidden email]> wrote:
Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.
That's wired. I don't think the number of network memory buffers have anything to do with the task amount.

Let me try to clarify a few things.

Please be aware that, how many tasks a Flink job has, and how many slots a Flink cluster has, are two different things.
- The number of tasks are decided by your job's parallelism and topology. E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) tasks.
- The number of slots are decided by number of TMs and slots-per-TM.
- For streaming jobs, you have to make sure the number of slots is enough for executing all your tasks. The number of slots needed for executing your job is by default the max parallelism of your job graph vertices. Take the above example, you would need 4 slots, because it's the max among all the vertices' parallelisms (2, 3, 4).

In your case, the screenshot shows that you job has 9621 tasks in total (not around 18000, the dark box shows total tasks while the green box shows running tasks), and 600 slots are in use (658 - 58) suggesting that the max parallelism of your job graph vertices is 600.

If you want to increase the number of tasks, you should increase your job parallelism. There are several ways to do that.
  • In your job codes (assuming you are using DataStream API)
    • Use `StreamExecutionEnvironment#setParallelism()` to set parallelism for all operators.
    • Use `SingleOutputStreamOperator#setParallelism()` to set parallelism for a specific operator. (Only supported for subclasses of `SingleOutputStreamOperator`.)
  • When submitting your job, use `-p <parallelism>` as an argument for the `flink run` command, to set parallelism for all operators.
  • Set `parallelism.default` in your `flink-conf.yaml`, to set a default parallelism for your jobs. This will be used for jobs that have not set parallelism with neither of the above methods.

Thank you~

Xintong Song



On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Vijay Balakrishnan
Thanks so much, Xintong for guiding me through this. I looked at the Flink logs to see the errors.
I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout: 240s to increase the number of tasks.
Now, I am able to increase the number of Tasks/ aka Task vertices.

taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.max: 4gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s

On Tue, May 26, 2020 at 8:42 PM Xintong Song <[hidden email]> wrote:
Could you also explain how do you set the parallelism when getting this execution plan?
I'm asking because this json file itself only shows the resulted execution plan. It is not clear to me what is not working as expected in your case. E.g., you set the parallelism for an operator to 10 but the execution plan only shows 5.

Thank you~

Xintong Song



On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thanks for the excellent clarification for tasks.

I attached a sample screenshot above and din't reflect the slots used and the tasks limit I was running into in that pic.

I am attaching my Execution plan here. Please let me know how I can increase the nmber of tasks aka parallelism. As  increase the parallelism, i run into this bottleneck with the tasks.

BTW - The https://flink.apache.org/visualizer/ is a great start to see this.
TIA,

On Sun, May 24, 2020 at 7:52 PM Xintong Song <[hidden email]> wrote:
Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.
That's wired. I don't think the number of network memory buffers have anything to do with the task amount.

Let me try to clarify a few things.

Please be aware that, how many tasks a Flink job has, and how many slots a Flink cluster has, are two different things.
- The number of tasks are decided by your job's parallelism and topology. E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) tasks.
- The number of slots are decided by number of TMs and slots-per-TM.
- For streaming jobs, you have to make sure the number of slots is enough for executing all your tasks. The number of slots needed for executing your job is by default the max parallelism of your job graph vertices. Take the above example, you would need 4 slots, because it's the max among all the vertices' parallelisms (2, 3, 4).

In your case, the screenshot shows that you job has 9621 tasks in total (not around 18000, the dark box shows total tasks while the green box shows running tasks), and 600 slots are in use (658 - 58) suggesting that the max parallelism of your job graph vertices is 600.

If you want to increase the number of tasks, you should increase your job parallelism. There are several ways to do that.
  • In your job codes (assuming you are using DataStream API)
    • Use `StreamExecutionEnvironment#setParallelism()` to set parallelism for all operators.
    • Use `SingleOutputStreamOperator#setParallelism()` to set parallelism for a specific operator. (Only supported for subclasses of `SingleOutputStreamOperator`.)
  • When submitting your job, use `-p <parallelism>` as an argument for the `flink run` command, to set parallelism for all operators.
  • Set `parallelism.default` in your `flink-conf.yaml`, to set a default parallelism for your jobs. This will be used for jobs that have not set parallelism with neither of the above methods.

Thank you~

Xintong Song



On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Vijay Balakrishnan
Hi Xintong,
Looks like the issue is not fully resolved :( Attaching 2 screenshots of the memory consumption of 1 of the TaskManagers.

To increase the used up Direct memory off heap,Do I change this:   taskmanager.memory.task.off-heap.size: 5gb

I had increased the taskmanager.network.memory.max: 24gb
which seems excessive.

1 of the errors I saw in the Flink logs:

java.io.IOException: Insufficient number of network buffers: required 1, but only 0 available. The total number of network buffers is currently set to 85922 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)

TIA,


On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <[hidden email]> wrote:
Thanks so much, Xintong for guiding me through this. I looked at the Flink logs to see the errors.
I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout: 240s to increase the number of tasks.
Now, I am able to increase the number of Tasks/ aka Task vertices.

taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.max: 4gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s

On Tue, May 26, 2020 at 8:42 PM Xintong Song <[hidden email]> wrote:
Could you also explain how do you set the parallelism when getting this execution plan?
I'm asking because this json file itself only shows the resulted execution plan. It is not clear to me what is not working as expected in your case. E.g., you set the parallelism for an operator to 10 but the execution plan only shows 5.

Thank you~

Xintong Song



On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thanks for the excellent clarification for tasks.

I attached a sample screenshot above and din't reflect the slots used and the tasks limit I was running into in that pic.

I am attaching my Execution plan here. Please let me know how I can increase the nmber of tasks aka parallelism. As  increase the parallelism, i run into this bottleneck with the tasks.

BTW - The https://flink.apache.org/visualizer/ is a great start to see this.
TIA,

On Sun, May 24, 2020 at 7:52 PM Xintong Song <[hidden email]> wrote:
Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.
That's wired. I don't think the number of network memory buffers have anything to do with the task amount.

Let me try to clarify a few things.

Please be aware that, how many tasks a Flink job has, and how many slots a Flink cluster has, are two different things.
- The number of tasks are decided by your job's parallelism and topology. E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) tasks.
- The number of slots are decided by number of TMs and slots-per-TM.
- For streaming jobs, you have to make sure the number of slots is enough for executing all your tasks. The number of slots needed for executing your job is by default the max parallelism of your job graph vertices. Take the above example, you would need 4 slots, because it's the max among all the vertices' parallelisms (2, 3, 4).

In your case, the screenshot shows that you job has 9621 tasks in total (not around 18000, the dark box shows total tasks while the green box shows running tasks), and 600 slots are in use (658 - 58) suggesting that the max parallelism of your job graph vertices is 600.

If you want to increase the number of tasks, you should increase your job parallelism. There are several ways to do that.
  • In your job codes (assuming you are using DataStream API)
    • Use `StreamExecutionEnvironment#setParallelism()` to set parallelism for all operators.
    • Use `SingleOutputStreamOperator#setParallelism()` to set parallelism for a specific operator. (Only supported for subclasses of `SingleOutputStreamOperator`.)
  • When submitting your job, use `-p <parallelism>` as an argument for the `flink run` command, to set parallelism for all operators.
  • Set `parallelism.default` in your `flink-conf.yaml`, to set a default parallelism for your jobs. This will be used for jobs that have not set parallelism with neither of the above methods.

Thank you~

Xintong Song



On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,


Screen Shot 2020-05-27 at 5.04.17 PM.png (236K) Download Attachment
Screen Shot 2020-05-27 at 5.05.12 PM.png (287K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Xintong Song
Ah, I guess I had misunderstood what your mean.

Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.
When you say "it still works", I thought that you increased the parallelism the job was sill executed as the parallelism was not increased.
From your latest reply, it seems the job's parallelism is indeed increased, but then it runs into failures.

The reason you run into the "Insufficient number of network buffers" exception, is that with more tasks in your job, more inter-task data transmission channels, thus memory for network buffers, are needed.

To increase the network memory size, the following configuration options, as you already found, are related.
  • taskmanager.network.memory.fraction
  • taskmanager.network.memory.max
  • taskmanager.network.memory.min
Please be aware that `taskmanager.memory.task.off-heap.size` is not related to network memory, and is only available in Flink 1.10 and above while you're using 1.9.1 as suggested by the screenshots.

The network memory size is calculated as `min(max(some_total_value * network_fraction, network_min), network_max)`. According to the error message, your current network memory size is `85922 buffers * 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means increasing the "max" does not help in your case. It is the "fraction" that you need to increase.

Thank you~

Xintong Song



On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Looks like the issue is not fully resolved :( Attaching 2 screenshots of the memory consumption of 1 of the TaskManagers.

To increase the used up Direct memory off heap,Do I change this:   taskmanager.memory.task.off-heap.size: 5gb

I had increased the taskmanager.network.memory.max: 24gb
which seems excessive.

1 of the errors I saw in the Flink logs:

java.io.IOException: Insufficient number of network buffers: required 1, but only 0 available. The total number of network buffers is currently set to 85922 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)

TIA,


On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <[hidden email]> wrote:
Thanks so much, Xintong for guiding me through this. I looked at the Flink logs to see the errors.
I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout: 240s to increase the number of tasks.
Now, I am able to increase the number of Tasks/ aka Task vertices.

taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.max: 4gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s

On Tue, May 26, 2020 at 8:42 PM Xintong Song <[hidden email]> wrote:
Could you also explain how do you set the parallelism when getting this execution plan?
I'm asking because this json file itself only shows the resulted execution plan. It is not clear to me what is not working as expected in your case. E.g., you set the parallelism for an operator to 10 but the execution plan only shows 5.

Thank you~

Xintong Song



On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thanks for the excellent clarification for tasks.

I attached a sample screenshot above and din't reflect the slots used and the tasks limit I was running into in that pic.

I am attaching my Execution plan here. Please let me know how I can increase the nmber of tasks aka parallelism. As  increase the parallelism, i run into this bottleneck with the tasks.

BTW - The https://flink.apache.org/visualizer/ is a great start to see this.
TIA,

On Sun, May 24, 2020 at 7:52 PM Xintong Song <[hidden email]> wrote:
Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.
That's wired. I don't think the number of network memory buffers have anything to do with the task amount.

Let me try to clarify a few things.

Please be aware that, how many tasks a Flink job has, and how many slots a Flink cluster has, are two different things.
- The number of tasks are decided by your job's parallelism and topology. E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) tasks.
- The number of slots are decided by number of TMs and slots-per-TM.
- For streaming jobs, you have to make sure the number of slots is enough for executing all your tasks. The number of slots needed for executing your job is by default the max parallelism of your job graph vertices. Take the above example, you would need 4 slots, because it's the max among all the vertices' parallelisms (2, 3, 4).

In your case, the screenshot shows that you job has 9621 tasks in total (not around 18000, the dark box shows total tasks while the green box shows running tasks), and 600 slots are in use (658 - 58) suggesting that the max parallelism of your job graph vertices is 600.

If you want to increase the number of tasks, you should increase your job parallelism. There are several ways to do that.
  • In your job codes (assuming you are using DataStream API)
    • Use `StreamExecutionEnvironment#setParallelism()` to set parallelism for all operators.
    • Use `SingleOutputStreamOperator#setParallelism()` to set parallelism for a specific operator. (Only supported for subclasses of `SingleOutputStreamOperator`.)
  • When submitting your job, use `-p <parallelism>` as an argument for the `flink run` command, to set parallelism for all operators.
  • Set `parallelism.default` in your `flink-conf.yaml`, to set a default parallelism for your jobs. This will be used for jobs that have not set parallelism with neither of the above methods.

Thank you~

Xintong Song



On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Vijay Balakrishnan
Thx, Xintong for the detailed explanation of memory fraction. I increased the mem fraction now.

As I increase the defaultParallelism, I keep getting this error:

org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not reachable.
    at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
    at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
    at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
    at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
    at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
    ... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    ... 1 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.239.218:45544
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    ... 6 more
Caused by: java.net.ConnectException: Connection timed out
    ... 10 more


On Wed, May 27, 2020 at 7:14 PM Xintong Song <[hidden email]> wrote:
Ah, I guess I had misunderstood what your mean.

Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.
When you say "it still works", I thought that you increased the parallelism the job was sill executed as the parallelism was not increased.
From your latest reply, it seems the job's parallelism is indeed increased, but then it runs into failures.

The reason you run into the "Insufficient number of network buffers" exception, is that with more tasks in your job, more inter-task data transmission channels, thus memory for network buffers, are needed.

To increase the network memory size, the following configuration options, as you already found, are related.
  • taskmanager.network.memory.fraction
  • taskmanager.network.memory.max
  • taskmanager.network.memory.min
Please be aware that `taskmanager.memory.task.off-heap.size` is not related to network memory, and is only available in Flink 1.10 and above while you're using 1.9.1 as suggested by the screenshots.

The network memory size is calculated as `min(max(some_total_value * network_fraction, network_min), network_max)`. According to the error message, your current network memory size is `85922 buffers * 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means increasing the "max" does not help in your case. It is the "fraction" that you need to increase.

Thank you~

Xintong Song



On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Looks like the issue is not fully resolved :( Attaching 2 screenshots of the memory consumption of 1 of the TaskManagers.

To increase the used up Direct memory off heap,Do I change this:   taskmanager.memory.task.off-heap.size: 5gb

I had increased the taskmanager.network.memory.max: 24gb
which seems excessive.

1 of the errors I saw in the Flink logs:

java.io.IOException: Insufficient number of network buffers: required 1, but only 0 available. The total number of network buffers is currently set to 85922 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)

TIA,


On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <[hidden email]> wrote:
Thanks so much, Xintong for guiding me through this. I looked at the Flink logs to see the errors.
I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout: 240s to increase the number of tasks.
Now, I am able to increase the number of Tasks/ aka Task vertices.

taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.max: 4gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s

On Tue, May 26, 2020 at 8:42 PM Xintong Song <[hidden email]> wrote:
Could you also explain how do you set the parallelism when getting this execution plan?
I'm asking because this json file itself only shows the resulted execution plan. It is not clear to me what is not working as expected in your case. E.g., you set the parallelism for an operator to 10 but the execution plan only shows 5.

Thank you~

Xintong Song



On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thanks for the excellent clarification for tasks.

I attached a sample screenshot above and din't reflect the slots used and the tasks limit I was running into in that pic.

I am attaching my Execution plan here. Please let me know how I can increase the nmber of tasks aka parallelism. As  increase the parallelism, i run into this bottleneck with the tasks.

BTW - The https://flink.apache.org/visualizer/ is a great start to see this.
TIA,

On Sun, May 24, 2020 at 7:52 PM Xintong Song <[hidden email]> wrote:
Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.
That's wired. I don't think the number of network memory buffers have anything to do with the task amount.

Let me try to clarify a few things.

Please be aware that, how many tasks a Flink job has, and how many slots a Flink cluster has, are two different things.
- The number of tasks are decided by your job's parallelism and topology. E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) tasks.
- The number of slots are decided by number of TMs and slots-per-TM.
- For streaming jobs, you have to make sure the number of slots is enough for executing all your tasks. The number of slots needed for executing your job is by default the max parallelism of your job graph vertices. Take the above example, you would need 4 slots, because it's the max among all the vertices' parallelisms (2, 3, 4).

In your case, the screenshot shows that you job has 9621 tasks in total (not around 18000, the dark box shows total tasks while the green box shows running tasks), and 600 slots are in use (658 - 58) suggesting that the max parallelism of your job graph vertices is 600.

If you want to increase the number of tasks, you should increase your job parallelism. There are several ways to do that.
  • In your job codes (assuming you are using DataStream API)
    • Use `StreamExecutionEnvironment#setParallelism()` to set parallelism for all operators.
    • Use `SingleOutputStreamOperator#setParallelism()` to set parallelism for a specific operator. (Only supported for subclasses of `SingleOutputStreamOperator`.)
  • When submitting your job, use `-p <parallelism>` as an argument for the `flink run` command, to set parallelism for all operators.
  • Set `parallelism.default` in your `flink-conf.yaml`, to set a default parallelism for your jobs. This will be used for jobs that have not set parallelism with neither of the above methods.

Thank you~

Xintong Song



On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Vijay Balakrishnan
Hi All,
The Job takes forever to startup and is now failing all the time to startup.
Physical Memory:62.1 GB
JVM Heap Size:15.0 GB
Flink Managed Memory:10.5 GB
Attached a TM screenshot.

Tried increasing the following:

taskmanager.numberOfTaskSlots: 10
parallelism.default: 1
rest.server.max-content-length: 314572800
taskmanager.network.memory.fraction: 0.45
taskmanager.network.memory.max: 24gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s
cluster.evenly-spread-out-slots: true
taskmanager.network.netty.client.connectTimeoutSec: 240
taskmanager.network.detailed-metrics: true
taskmanager.network.memory.floating-buffers-per-gate: 16
akka.tcp.timeout: 30s  

There are more than enough slots. Issue seems to be communicating over TCP with Remote Task managers ??

Getting this exception on a TaskManager:

2020-05-31 20:37:31,436 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(TumblingEventTimeWindows(5000), EventTimeTrigger, MGroupingWindowAggregate, MGroupingAggregateWindowProcessing) (36/440) (921fe6761ce844a6850c5fc67326b221) switched from DEPLOYING to FAILED.
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition faea47916a206dc8d014694ec72ab577@95f71b39868d4e23a180ce11653dc4ca not reachable.
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
--
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.127.106.54:33564' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
... 7 more
--
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.127.106.54:33564' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
... 1 more
--
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
... 1 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.127.106.54:33564


On Fri, May 29, 2020 at 12:43 PM Vijay Balakrishnan <[hidden email]> wrote:
Thx, Xintong for the detailed explanation of memory fraction. I increased the mem fraction now.

As I increase the defaultParallelism, I keep getting this error:

org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not reachable.
    at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
    at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
    at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
    at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
    at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
    ... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    ... 1 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.239.218:45544
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    ... 6 more
Caused by: java.net.ConnectException: Connection timed out
    ... 10 more


On Wed, May 27, 2020 at 7:14 PM Xintong Song <[hidden email]> wrote:
Ah, I guess I had misunderstood what your mean.

Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.
When you say "it still works", I thought that you increased the parallelism the job was sill executed as the parallelism was not increased.
From your latest reply, it seems the job's parallelism is indeed increased, but then it runs into failures.

The reason you run into the "Insufficient number of network buffers" exception, is that with more tasks in your job, more inter-task data transmission channels, thus memory for network buffers, are needed.

To increase the network memory size, the following configuration options, as you already found, are related.
  • taskmanager.network.memory.fraction
  • taskmanager.network.memory.max
  • taskmanager.network.memory.min
Please be aware that `taskmanager.memory.task.off-heap.size` is not related to network memory, and is only available in Flink 1.10 and above while you're using 1.9.1 as suggested by the screenshots.

The network memory size is calculated as `min(max(some_total_value * network_fraction, network_min), network_max)`. According to the error message, your current network memory size is `85922 buffers * 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means increasing the "max" does not help in your case. It is the "fraction" that you need to increase.

Thank you~

Xintong Song



On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Looks like the issue is not fully resolved :( Attaching 2 screenshots of the memory consumption of 1 of the TaskManagers.

To increase the used up Direct memory off heap,Do I change this:   taskmanager.memory.task.off-heap.size: 5gb

I had increased the taskmanager.network.memory.max: 24gb
which seems excessive.

1 of the errors I saw in the Flink logs:

java.io.IOException: Insufficient number of network buffers: required 1, but only 0 available. The total number of network buffers is currently set to 85922 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)

TIA,


On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <[hidden email]> wrote:
Thanks so much, Xintong for guiding me through this. I looked at the Flink logs to see the errors.
I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout: 240s to increase the number of tasks.
Now, I am able to increase the number of Tasks/ aka Task vertices.

taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.max: 4gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s

On Tue, May 26, 2020 at 8:42 PM Xintong Song <[hidden email]> wrote:
Could you also explain how do you set the parallelism when getting this execution plan?
I'm asking because this json file itself only shows the resulted execution plan. It is not clear to me what is not working as expected in your case. E.g., you set the parallelism for an operator to 10 but the execution plan only shows 5.

Thank you~

Xintong Song



On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thanks for the excellent clarification for tasks.

I attached a sample screenshot above and din't reflect the slots used and the tasks limit I was running into in that pic.

I am attaching my Execution plan here. Please let me know how I can increase the nmber of tasks aka parallelism. As  increase the parallelism, i run into this bottleneck with the tasks.

BTW - The https://flink.apache.org/visualizer/ is a great start to see this.
TIA,

On Sun, May 24, 2020 at 7:52 PM Xintong Song <[hidden email]> wrote:
Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.
That's wired. I don't think the number of network memory buffers have anything to do with the task amount.

Let me try to clarify a few things.

Please be aware that, how many tasks a Flink job has, and how many slots a Flink cluster has, are two different things.
- The number of tasks are decided by your job's parallelism and topology. E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) tasks.
- The number of slots are decided by number of TMs and slots-per-TM.
- For streaming jobs, you have to make sure the number of slots is enough for executing all your tasks. The number of slots needed for executing your job is by default the max parallelism of your job graph vertices. Take the above example, you would need 4 slots, because it's the max among all the vertices' parallelisms (2, 3, 4).

In your case, the screenshot shows that you job has 9621 tasks in total (not around 18000, the dark box shows total tasks while the green box shows running tasks), and 600 slots are in use (658 - 58) suggesting that the max parallelism of your job graph vertices is 600.

If you want to increase the number of tasks, you should increase your job parallelism. There are several ways to do that.
  • In your job codes (assuming you are using DataStream API)
    • Use `StreamExecutionEnvironment#setParallelism()` to set parallelism for all operators.
    • Use `SingleOutputStreamOperator#setParallelism()` to set parallelism for a specific operator. (Only supported for subclasses of `SingleOutputStreamOperator`.)
  • When submitting your job, use `-p <parallelism>` as an argument for the `flink run` command, to set parallelism for all operators.
  • Set `parallelism.default` in your `flink-conf.yaml`, to set a default parallelism for your jobs. This will be used for jobs that have not set parallelism with neither of the above methods.

Thank you~

Xintong Song



On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,


Screen Shot 2020-05-31 at 1.56.02 PM.png (246K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Xintong Song
Hi Vijay,

The error message suggests that another task manager (10.127.106.54) is not responding. This could happen when the remote task manager has failed or under severe GC pressure. You would need to find the log of the remote task manager to understand what is happening.

Thank you~

Xintong Song



On Mon, Jun 1, 2020 at 4:57 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi All,
The Job takes forever to startup and is now failing all the time to startup.
Physical Memory:62.1 GB
JVM Heap Size:15.0 GB
Flink Managed Memory:10.5 GB
Attached a TM screenshot.

Tried increasing the following:

taskmanager.numberOfTaskSlots: 10
parallelism.default: 1
rest.server.max-content-length: 314572800
taskmanager.network.memory.fraction: 0.45
taskmanager.network.memory.max: 24gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s
cluster.evenly-spread-out-slots: true
taskmanager.network.netty.client.connectTimeoutSec: 240
taskmanager.network.detailed-metrics: true
taskmanager.network.memory.floating-buffers-per-gate: 16
akka.tcp.timeout: 30s  

There are more than enough slots. Issue seems to be communicating over TCP with Remote Task managers ??

Getting this exception on a TaskManager:

2020-05-31 20:37:31,436 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(TumblingEventTimeWindows(5000), EventTimeTrigger, MGroupingWindowAggregate, MGroupingAggregateWindowProcessing) (36/440) (921fe6761ce844a6850c5fc67326b221) switched from DEPLOYING to FAILED.
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition faea47916a206dc8d014694ec72ab577@95f71b39868d4e23a180ce11653dc4ca not reachable.
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
--
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.127.106.54:33564' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
... 7 more
--
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.127.106.54:33564' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
... 1 more
--
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
... 1 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.127.106.54:33564


On Fri, May 29, 2020 at 12:43 PM Vijay Balakrishnan <[hidden email]> wrote:
Thx, Xintong for the detailed explanation of memory fraction. I increased the mem fraction now.

As I increase the defaultParallelism, I keep getting this error:

org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not reachable.
    at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
    at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
    at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
    at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
    at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
    ... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    ... 1 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.239.218:45544
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    ... 6 more
Caused by: java.net.ConnectException: Connection timed out
    ... 10 more


On Wed, May 27, 2020 at 7:14 PM Xintong Song <[hidden email]> wrote:
Ah, I guess I had misunderstood what your mean.

Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.
When you say "it still works", I thought that you increased the parallelism the job was sill executed as the parallelism was not increased.
From your latest reply, it seems the job's parallelism is indeed increased, but then it runs into failures.

The reason you run into the "Insufficient number of network buffers" exception, is that with more tasks in your job, more inter-task data transmission channels, thus memory for network buffers, are needed.

To increase the network memory size, the following configuration options, as you already found, are related.
  • taskmanager.network.memory.fraction
  • taskmanager.network.memory.max
  • taskmanager.network.memory.min
Please be aware that `taskmanager.memory.task.off-heap.size` is not related to network memory, and is only available in Flink 1.10 and above while you're using 1.9.1 as suggested by the screenshots.

The network memory size is calculated as `min(max(some_total_value * network_fraction, network_min), network_max)`. According to the error message, your current network memory size is `85922 buffers * 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means increasing the "max" does not help in your case. It is the "fraction" that you need to increase.

Thank you~

Xintong Song



On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Looks like the issue is not fully resolved :( Attaching 2 screenshots of the memory consumption of 1 of the TaskManagers.

To increase the used up Direct memory off heap,Do I change this:   taskmanager.memory.task.off-heap.size: 5gb

I had increased the taskmanager.network.memory.max: 24gb
which seems excessive.

1 of the errors I saw in the Flink logs:

java.io.IOException: Insufficient number of network buffers: required 1, but only 0 available. The total number of network buffers is currently set to 85922 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)

TIA,


On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <[hidden email]> wrote:
Thanks so much, Xintong for guiding me through this. I looked at the Flink logs to see the errors.
I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout: 240s to increase the number of tasks.
Now, I am able to increase the number of Tasks/ aka Task vertices.

taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.max: 4gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s

On Tue, May 26, 2020 at 8:42 PM Xintong Song <[hidden email]> wrote:
Could you also explain how do you set the parallelism when getting this execution plan?
I'm asking because this json file itself only shows the resulted execution plan. It is not clear to me what is not working as expected in your case. E.g., you set the parallelism for an operator to 10 but the execution plan only shows 5.

Thank you~

Xintong Song



On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thanks for the excellent clarification for tasks.

I attached a sample screenshot above and din't reflect the slots used and the tasks limit I was running into in that pic.

I am attaching my Execution plan here. Please let me know how I can increase the nmber of tasks aka parallelism. As  increase the parallelism, i run into this bottleneck with the tasks.

BTW - The https://flink.apache.org/visualizer/ is a great start to see this.
TIA,

On Sun, May 24, 2020 at 7:52 PM Xintong Song <[hidden email]> wrote:
Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.
That's wired. I don't think the number of network memory buffers have anything to do with the task amount.

Let me try to clarify a few things.

Please be aware that, how many tasks a Flink job has, and how many slots a Flink cluster has, are two different things.
- The number of tasks are decided by your job's parallelism and topology. E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) tasks.
- The number of slots are decided by number of TMs and slots-per-TM.
- For streaming jobs, you have to make sure the number of slots is enough for executing all your tasks. The number of slots needed for executing your job is by default the max parallelism of your job graph vertices. Take the above example, you would need 4 slots, because it's the max among all the vertices' parallelisms (2, 3, 4).

In your case, the screenshot shows that you job has 9621 tasks in total (not around 18000, the dark box shows total tasks while the green box shows running tasks), and 600 slots are in use (658 - 58) suggesting that the max parallelism of your job graph vertices is 600.

If you want to increase the number of tasks, you should increase your job parallelism. There are several ways to do that.
  • In your job codes (assuming you are using DataStream API)
    • Use `StreamExecutionEnvironment#setParallelism()` to set parallelism for all operators.
    • Use `SingleOutputStreamOperator#setParallelism()` to set parallelism for a specific operator. (Only supported for subclasses of `SingleOutputStreamOperator`.)
  • When submitting your job, use `-p <parallelism>` as an argument for the `flink run` command, to set parallelism for all operators.
  • Set `parallelism.default` in your `flink-conf.yaml`, to set a default parallelism for your jobs. This will be used for jobs that have not set parallelism with neither of the above methods.

Thank you~

Xintong Song



On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Vijay Balakrishnan
Thx a ton, Xintong. 
I am using this configuration now:
 taskmanager.numberOfTaskSlots: 14
    rest.server.max-content-length: 314572800
    taskmanager.network.memory.fraction: 0.45
    taskmanager.network.memory.max: 24gb
    taskmanager.network.memory.min: 500mb
    akka.ask.timeout: 240s
    cluster.evenly-spread-out-slots: true
    akka.tcp.timeout: 240s
taskmanager.network.request-backoff.initial: 5000
taskmanager.network.request-backoff.max: 30000
web.timeout:1000000

I still get an error on startup with loading the Flink jar. It resolves itself after failing on the 1st few tries. This is where taskmanager.network.request-backoff.initial: 5000 helped a little bit. Would like to get this Job starting successfully on the 1st try itself.Also attaching screenshot of error on Job failure.
Exception:
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition ce6b601e14b959de21d8351a6c5cf70c@1f2cd0d827586a4bc7b6f40ad2609db1 not reachable.
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
...
Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.128.49.96:43060' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.128.49.96:43060' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
...
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.128.49.96:43060
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
... 6 more
Caused by: java.net.ConnectException: Connection timed out
... 10 more

TIA,



On Sun, May 31, 2020 at 8:08 PM Xintong Song <[hidden email]> wrote:
Hi Vijay,

The error message suggests that another task manager (10.127.106.54) is not responding. This could happen when the remote task manager has failed or under severe GC pressure. You would need to find the log of the remote task manager to understand what is happening.

Thank you~

Xintong Song



On Mon, Jun 1, 2020 at 4:57 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi All,
The Job takes forever to startup and is now failing all the time to startup.
Physical Memory:62.1 GB
JVM Heap Size:15.0 GB
Flink Managed Memory:10.5 GB
Attached a TM screenshot.

Tried increasing the following:

taskmanager.numberOfTaskSlots: 10
parallelism.default: 1
rest.server.max-content-length: 314572800
taskmanager.network.memory.fraction: 0.45
taskmanager.network.memory.max: 24gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s
cluster.evenly-spread-out-slots: true
taskmanager.network.netty.client.connectTimeoutSec: 240
taskmanager.network.detailed-metrics: true
taskmanager.network.memory.floating-buffers-per-gate: 16
akka.tcp.timeout: 30s  

There are more than enough slots. Issue seems to be communicating over TCP with Remote Task managers ??

Getting this exception on a TaskManager:

2020-05-31 20:37:31,436 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(TumblingEventTimeWindows(5000), EventTimeTrigger, MGroupingWindowAggregate, MGroupingAggregateWindowProcessing) (36/440) (921fe6761ce844a6850c5fc67326b221) switched from DEPLOYING to FAILED.
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition faea47916a206dc8d014694ec72ab577@95f71b39868d4e23a180ce11653dc4ca not reachable.
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
--
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.127.106.54:33564' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
... 7 more
--
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.127.106.54:33564' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
... 1 more
--
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
... 1 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.127.106.54:33564


On Fri, May 29, 2020 at 12:43 PM Vijay Balakrishnan <[hidden email]> wrote:
Thx, Xintong for the detailed explanation of memory fraction. I increased the mem fraction now.

As I increase the defaultParallelism, I keep getting this error:

org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not reachable.
    at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
    at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
    at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
    at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
    at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
    ... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    ... 1 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.239.218:45544
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    ... 6 more
Caused by: java.net.ConnectException: Connection timed out
    ... 10 more


On Wed, May 27, 2020 at 7:14 PM Xintong Song <[hidden email]> wrote:
Ah, I guess I had misunderstood what your mean.

Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.
When you say "it still works", I thought that you increased the parallelism the job was sill executed as the parallelism was not increased.
From your latest reply, it seems the job's parallelism is indeed increased, but then it runs into failures.

The reason you run into the "Insufficient number of network buffers" exception, is that with more tasks in your job, more inter-task data transmission channels, thus memory for network buffers, are needed.

To increase the network memory size, the following configuration options, as you already found, are related.
  • taskmanager.network.memory.fraction
  • taskmanager.network.memory.max
  • taskmanager.network.memory.min
Please be aware that `taskmanager.memory.task.off-heap.size` is not related to network memory, and is only available in Flink 1.10 and above while you're using 1.9.1 as suggested by the screenshots.

The network memory size is calculated as `min(max(some_total_value * network_fraction, network_min), network_max)`. According to the error message, your current network memory size is `85922 buffers * 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means increasing the "max" does not help in your case. It is the "fraction" that you need to increase.

Thank you~

Xintong Song



On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Looks like the issue is not fully resolved :( Attaching 2 screenshots of the memory consumption of 1 of the TaskManagers.

To increase the used up Direct memory off heap,Do I change this:   taskmanager.memory.task.off-heap.size: 5gb

I had increased the taskmanager.network.memory.max: 24gb
which seems excessive.

1 of the errors I saw in the Flink logs:

java.io.IOException: Insufficient number of network buffers: required 1, but only 0 available. The total number of network buffers is currently set to 85922 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)

TIA,


On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <[hidden email]> wrote:
Thanks so much, Xintong for guiding me through this. I looked at the Flink logs to see the errors.
I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout: 240s to increase the number of tasks.
Now, I am able to increase the number of Tasks/ aka Task vertices.

taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.max: 4gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s

On Tue, May 26, 2020 at 8:42 PM Xintong Song <[hidden email]> wrote:
Could you also explain how do you set the parallelism when getting this execution plan?
I'm asking because this json file itself only shows the resulted execution plan. It is not clear to me what is not working as expected in your case. E.g., you set the parallelism for an operator to 10 but the execution plan only shows 5.

Thank you~

Xintong Song



On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thanks for the excellent clarification for tasks.

I attached a sample screenshot above and din't reflect the slots used and the tasks limit I was running into in that pic.

I am attaching my Execution plan here. Please let me know how I can increase the nmber of tasks aka parallelism. As  increase the parallelism, i run into this bottleneck with the tasks.

BTW - The https://flink.apache.org/visualizer/ is a great start to see this.
TIA,

On Sun, May 24, 2020 at 7:52 PM Xintong Song <[hidden email]> wrote:
Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.
That's wired. I don't think the number of network memory buffers have anything to do with the task amount.

Let me try to clarify a few things.

Please be aware that, how many tasks a Flink job has, and how many slots a Flink cluster has, are two different things.
- The number of tasks are decided by your job's parallelism and topology. E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) tasks.
- The number of slots are decided by number of TMs and slots-per-TM.
- For streaming jobs, you have to make sure the number of slots is enough for executing all your tasks. The number of slots needed for executing your job is by default the max parallelism of your job graph vertices. Take the above example, you would need 4 slots, because it's the max among all the vertices' parallelisms (2, 3, 4).

In your case, the screenshot shows that you job has 9621 tasks in total (not around 18000, the dark box shows total tasks while the green box shows running tasks), and 600 slots are in use (658 - 58) suggesting that the max parallelism of your job graph vertices is 600.

If you want to increase the number of tasks, you should increase your job parallelism. There are several ways to do that.
  • In your job codes (assuming you are using DataStream API)
    • Use `StreamExecutionEnvironment#setParallelism()` to set parallelism for all operators.
    • Use `SingleOutputStreamOperator#setParallelism()` to set parallelism for a specific operator. (Only supported for subclasses of `SingleOutputStreamOperator`.)
  • When submitting your job, use `-p <parallelism>` as an argument for the `flink run` command, to set parallelism for all operators.
  • Set `parallelism.default` in your `flink-conf.yaml`, to set a default parallelism for your jobs. This will be used for jobs that have not set parallelism with neither of the above methods.

Thank you~

Xintong Song



On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,


Screen Shot 2020-06-03 at 7.46.28 AM.png (249K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Dashboard UI Tasks hard limit

Xintong Song
Hi Vijay,

From the information you provided (the configurations, error message & screenshot), I'm not able to find out what is the problem and how to resolve it.

The error message comes from a healthy task manager, who discovered that another task manager is not responding. We would need to look into the log of the task manager that is not responding to understand what's wrong with it.

Thank you~

Xintong Song



On Fri, Jun 5, 2020 at 6:06 AM Vijay Balakrishnan <[hidden email]> wrote:
Thx a ton, Xintong. 
I am using this configuration now:
 taskmanager.numberOfTaskSlots: 14
    rest.server.max-content-length: 314572800
    taskmanager.network.memory.fraction: 0.45
    taskmanager.network.memory.max: 24gb
    taskmanager.network.memory.min: 500mb
    akka.ask.timeout: 240s
    cluster.evenly-spread-out-slots: true
    akka.tcp.timeout: 240s
taskmanager.network.request-backoff.initial: 5000
taskmanager.network.request-backoff.max: 30000
web.timeout:1000000

I still get an error on startup with loading the Flink jar. It resolves itself after failing on the 1st few tries. This is where taskmanager.network.request-backoff.initial: 5000 helped a little bit. Would like to get this Job starting successfully on the 1st try itself.Also attaching screenshot of error on Job failure.
Exception:
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition ce6b601e14b959de21d8351a6c5cf70c@1f2cd0d827586a4bc7b6f40ad2609db1 not reachable.
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
...
Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.128.49.96:43060' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.128.49.96:43060' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
...
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.128.49.96:43060
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
... 6 more
Caused by: java.net.ConnectException: Connection timed out
... 10 more

TIA,



On Sun, May 31, 2020 at 8:08 PM Xintong Song <[hidden email]> wrote:
Hi Vijay,

The error message suggests that another task manager (10.127.106.54) is not responding. This could happen when the remote task manager has failed or under severe GC pressure. You would need to find the log of the remote task manager to understand what is happening.

Thank you~

Xintong Song



On Mon, Jun 1, 2020 at 4:57 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi All,
The Job takes forever to startup and is now failing all the time to startup.
Physical Memory:62.1 GB
JVM Heap Size:15.0 GB
Flink Managed Memory:10.5 GB
Attached a TM screenshot.

Tried increasing the following:

taskmanager.numberOfTaskSlots: 10
parallelism.default: 1
rest.server.max-content-length: 314572800
taskmanager.network.memory.fraction: 0.45
taskmanager.network.memory.max: 24gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s
cluster.evenly-spread-out-slots: true
taskmanager.network.netty.client.connectTimeoutSec: 240
taskmanager.network.detailed-metrics: true
taskmanager.network.memory.floating-buffers-per-gate: 16
akka.tcp.timeout: 30s  

There are more than enough slots. Issue seems to be communicating over TCP with Remote Task managers ??

Getting this exception on a TaskManager:

2020-05-31 20:37:31,436 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(TumblingEventTimeWindows(5000), EventTimeTrigger, MGroupingWindowAggregate, MGroupingAggregateWindowProcessing) (36/440) (921fe6761ce844a6850c5fc67326b221) switched from DEPLOYING to FAILED.
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition faea47916a206dc8d014694ec72ab577@95f71b39868d4e23a180ce11653dc4ca not reachable.
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
--
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.127.106.54:33564' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
... 7 more
--
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.127.106.54:33564' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
... 1 more
--
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
... 1 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.127.106.54:33564


On Fri, May 29, 2020 at 12:43 PM Vijay Balakrishnan <[hidden email]> wrote:
Thx, Xintong for the detailed explanation of memory fraction. I increased the mem fraction now.

As I increase the defaultParallelism, I keep getting this error:

org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not reachable.
    at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
    at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
    at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
    at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
    at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
    ... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    ... 1 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.239.218:45544
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    ... 6 more
Caused by: java.net.ConnectException: Connection timed out
    ... 10 more


On Wed, May 27, 2020 at 7:14 PM Xintong Song <[hidden email]> wrote:
Ah, I guess I had misunderstood what your mean.

Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.
When you say "it still works", I thought that you increased the parallelism the job was sill executed as the parallelism was not increased.
From your latest reply, it seems the job's parallelism is indeed increased, but then it runs into failures.

The reason you run into the "Insufficient number of network buffers" exception, is that with more tasks in your job, more inter-task data transmission channels, thus memory for network buffers, are needed.

To increase the network memory size, the following configuration options, as you already found, are related.
  • taskmanager.network.memory.fraction
  • taskmanager.network.memory.max
  • taskmanager.network.memory.min
Please be aware that `taskmanager.memory.task.off-heap.size` is not related to network memory, and is only available in Flink 1.10 and above while you're using 1.9.1 as suggested by the screenshots.

The network memory size is calculated as `min(max(some_total_value * network_fraction, network_min), network_max)`. According to the error message, your current network memory size is `85922 buffers * 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means increasing the "max" does not help in your case. It is the "fraction" that you need to increase.

Thank you~

Xintong Song



On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Looks like the issue is not fully resolved :( Attaching 2 screenshots of the memory consumption of 1 of the TaskManagers.

To increase the used up Direct memory off heap,Do I change this:   taskmanager.memory.task.off-heap.size: 5gb

I had increased the taskmanager.network.memory.max: 24gb
which seems excessive.

1 of the errors I saw in the Flink logs:

java.io.IOException: Insufficient number of network buffers: required 1, but only 0 available. The total number of network buffers is currently set to 85922 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)

TIA,


On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <[hidden email]> wrote:
Thanks so much, Xintong for guiding me through this. I looked at the Flink logs to see the errors.
I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout: 240s to increase the number of tasks.
Now, I am able to increase the number of Tasks/ aka Task vertices.

taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.max: 4gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s

On Tue, May 26, 2020 at 8:42 PM Xintong Song <[hidden email]> wrote:
Could you also explain how do you set the parallelism when getting this execution plan?
I'm asking because this json file itself only shows the resulted execution plan. It is not clear to me what is not working as expected in your case. E.g., you set the parallelism for an operator to 10 but the execution plan only shows 5.

Thank you~

Xintong Song



On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thanks for the excellent clarification for tasks.

I attached a sample screenshot above and din't reflect the slots used and the tasks limit I was running into in that pic.

I am attaching my Execution plan here. Please let me know how I can increase the nmber of tasks aka parallelism. As  increase the parallelism, i run into this bottleneck with the tasks.

BTW - The https://flink.apache.org/visualizer/ is a great start to see this.
TIA,

On Sun, May 24, 2020 at 7:52 PM Xintong Song <[hidden email]> wrote:
Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.
That's wired. I don't think the number of network memory buffers have anything to do with the task amount.

Let me try to clarify a few things.

Please be aware that, how many tasks a Flink job has, and how many slots a Flink cluster has, are two different things.
- The number of tasks are decided by your job's parallelism and topology. E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) tasks.
- The number of slots are decided by number of TMs and slots-per-TM.
- For streaming jobs, you have to make sure the number of slots is enough for executing all your tasks. The number of slots needed for executing your job is by default the max parallelism of your job graph vertices. Take the above example, you would need 4 slots, because it's the max among all the vertices' parallelisms (2, 3, 4).

In your case, the screenshot shows that you job has 9621 tasks in total (not around 18000, the dark box shows total tasks while the green box shows running tasks), and 600 slots are in use (658 - 58) suggesting that the max parallelism of your job graph vertices is 600.

If you want to increase the number of tasks, you should increase your job parallelism. There are several ways to do that.
  • In your job codes (assuming you are using DataStream API)
    • Use `StreamExecutionEnvironment#setParallelism()` to set parallelism for all operators.
    • Use `SingleOutputStreamOperator#setParallelism()` to set parallelism for a specific operator. (Only supported for subclasses of `SingleOutputStreamOperator`.)
  • When submitting your job, use `-p <parallelism>` as an argument for the `flink run` command, to set parallelism for all operators.
  • Set `parallelism.default` in your `flink-conf.yaml`, to set a default parallelism for your jobs. This will be used for jobs that have not set parallelism with neither of the above methods.

Thank you~

Xintong Song



On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
Hi Vijay,

I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?
  • What kind of job are your executing? Is it a streaming or batch processing job?
  • Which Flink deployment do you use? Standalone? Yarn?
  • It would be helpful if you can share the Flink logs.

Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box.
I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks
stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up.
Even though I increased the number of slots, it still works when 312 slots are being used.

taskmanager.numberOfTaskSlots: 15

What knob can I tune to increase the number of Tasks ?

Pls find attached the Flink Dashboard UI.

TIA,