Hi, Flink Dashboard UI seems to show tasks having a hard limit for Tasks column around 18000 on a Ubuntu Linux box. I kept increasing the number of slots per task manager to 15 and number of slots increased to 705 but the slots to tasks stayed at around 18000. Below 18000 tasks, the Flink Job is able to start up. Even though I increased the number of slots, it still works when 312 slots are being used. taskmanager.numberOfTaskSlots: 15 What knob can I tune to increase the number of Tasks ? Pls find attached the Flink Dashboard UI. TIA, Screen Shot 2020-05-19 at 12.15.20 PM.png (177K) Download Attachment |
Hi, I have increased the number of slots available but the Job is not using all the slots but runs into this approximate 18000 Tasks limit. Looking into the source code, it seems to be opening file - https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203 So, do I have to tune the ulimit or something similar at the Ubuntu O/S level to increase number of tasks available ? What I am confused about is the ulimit is per machine but the ExecutionGraph is across many machines ? Please pardon my ignorance here. Does number of tasks equate to number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has 16 vCPUs. TIA. On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <[hidden email]> wrote:
|
Hi Vijay, I don't think your problem is related to number of opening files. The parallelism of your job is decided before actually tries to open the files. And if the OS limit for opening files is reached, you should see a job execution failure, instead of a success execution with a lower parallelism. Could you share some more information about your use case?
Thank you~ Xintong Song On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <[hidden email]> wrote:
|
Hi Xintong, Thx for your reply. Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly. Streaming job Standalone Vijay On Fri, May 22, 2020 at 2:49 AM Xintong Song <[hidden email]> wrote:
|
Increasing network memory buffers (fraction, min, max) seems to increase tasks slightly. That's wired. I don't think the number of network memory buffers have anything to do with the task amount. Let me try to clarify a few things. Please be aware that, how many tasks a Flink job has, and how many slots a Flink cluster has, are two different things. - The number of tasks are decided by your job's parallelism and topology. E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) tasks. - The number of slots are decided by number of TMs and slots-per-TM. - For streaming jobs, you have to make sure the number of slots is enough for executing all your tasks. The number of slots needed for executing your job is by default the max parallelism of your job graph vertices. Take the above example, you would need 4 slots, because it's the max among all the vertices' parallelisms (2, 3, 4). In your case, the screenshot shows that you job has 9621 tasks in total (not around 18000, the dark box shows total tasks while the green box shows running tasks), and 600 slots are in use (658 - 58) suggesting that the max parallelism of your job graph vertices is 600. If you want to increase the number of tasks, you should increase your job parallelism. There are several ways to do that.
Thank you~ Xintong Song On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <[hidden email]> wrote:
|
Hi Xintong, Thanks for the excellent clarification for tasks. I attached a sample screenshot above and din't reflect the slots used and the tasks limit I was running into in that pic. I am attaching my Execution plan here. Please let me know how I can increase the nmber of tasks aka parallelism. As increase the parallelism, i run into this bottleneck with the tasks. BTW - The https://flink.apache.org/visualizer/ is a great start to see this. TIA, On Sun, May 24, 2020 at 7:52 PM Xintong Song <[hidden email]> wrote:
analytics-execution-plan-real.json (224K) Download Attachment |
Could you also explain how do you set the parallelism when getting this execution plan? I'm asking because this json file itself only shows the resulted execution plan. It is not clear to me what is not working as expected in your case. E.g., you set the parallelism for an operator to 10 but the execution plan only shows 5. Thank you~ Xintong Song On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <[hidden email]> wrote:
|
Thanks so much, Xintong for guiding me through this. I looked at the Flink logs to see the errors. I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout: 240s to increase the number of tasks. Now, I am able to increase the number of Tasks/ aka Task vertices. taskmanager.network.memory.fraction: 0.15 taskmanager.network.memory.max: 4gb taskmanager.network.memory.min: 500mb akka.ask.timeout: 240s On Tue, May 26, 2020 at 8:42 PM Xintong Song <[hidden email]> wrote:
|
Hi Xintong, Looks like the issue is not fully resolved :( Attaching 2 screenshots of the memory consumption of 1 of the TaskManagers. To increase the used up Direct memory off heap,Do I change this: taskmanager.memory.task.off-heap.size: 5gb I had increased the taskmanager.network.memory.max: 24gb which seems excessive. 1 of the errors I saw in the Flink logs: java.io.IOException: Insufficient number of network buffers: required 1, but only 0 available. The total number of network buffers is currently set to 85922 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281) at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191) TIA, On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <[hidden email]> wrote:
Screen Shot 2020-05-27 at 5.04.17 PM.png (236K) Download Attachment Screen Shot 2020-05-27 at 5.05.12 PM.png (287K) Download Attachment |
Ah, I guess I had misunderstood what your mean. Below 18000 tasks, the Flink Job is able to start up. When you say "it still works", I thought that you increased the parallelism the job was sill executed as the parallelism was not increased. From your latest reply, it seems the job's parallelism is indeed increased, but then it runs into failures. The reason you run into the "Insufficient number of network buffers" exception, is that with more tasks in your job, more inter-task data transmission channels, thus memory for network buffers, are needed. To increase the network memory size, the following configuration options, as you already found, are related.
Please be aware that `taskmanager.memory.task.off-heap.size` is not related to network memory, and is only available in Flink 1.10 and above while you're using 1.9.1 as suggested by the screenshots. The network memory size is calculated as `min(max(some_total_value * network_fraction, network_min), network_max)`. According to the error message, your current network memory size is `85922 buffers * 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means increasing the "max" does not help in your case. It is the "fraction" that you need to increase. Thank you~ Xintong Song On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <[hidden email]> wrote:
|
Thx, Xintong for the detailed explanation of memory fraction. I increased the mem fraction now. As I increase the defaultParallelism, I keep getting this error: org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not reachable. at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70) at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165) ... 7 more Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) ... 1 more Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.239.218:45544 at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ... 6 more ... 10 more On Wed, May 27, 2020 at 7:14 PM Xintong Song <[hidden email]> wrote:
|
Hi All, The Job takes forever to startup and is now failing all the time to startup. Physical Memory:62.1 GB JVM Heap Size:15.0 GB Flink Managed Memory:10.5 GB Attached a TM screenshot. Tried increasing the following: taskmanager.numberOfTaskSlots: 10 parallelism.default: 1 rest.server.max-content-length: 314572800 taskmanager.network.memory.fraction: 0.45 taskmanager.network.memory.max: 24gb taskmanager.network.memory.min: 500mb akka.ask.timeout: 240s cluster.evenly-spread-out-slots: true taskmanager.network.netty.client.connectTimeoutSec: 240 taskmanager.network.detailed-metrics: true taskmanager.network.memory.floating-buffers-per-gate: 16 akka.tcp.timeout: 30s There are more than enough slots. Issue seems to be communicating over TCP with Remote Task managers ?? Getting this exception on a TaskManager: 2020-05-31 20:37:31,436 INFO org.apache.flink.runtime.taskmanager.Task - Window(TumblingEventTimeWindows(5000), EventTimeTrigger, MGroupingWindowAggregate, MGroupingAggregateWindowProcessing) (36/440) (921fe6761ce844a6850c5fc67326b221) switched from DEPLOYING to FAILED. org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition faea47916a206dc8d014694ec72ab577@95f71b39868d4e23a180ce11653dc4ca not reachable. at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) -- at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.127.106.54:33564' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70) at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165) ... 7 more -- at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70) at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165) ... 7 more Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.127.106.54:33564' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) ... 1 more -- at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) ... 1 more Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.127.106.54:33564 On Fri, May 29, 2020 at 12:43 PM Vijay Balakrishnan <[hidden email]> wrote:
Screen Shot 2020-05-31 at 1.56.02 PM.png (246K) Download Attachment |
Hi Vijay, The error message suggests that another task manager (10.127.106.54) is not responding. This could happen when the remote task manager has failed or under severe GC pressure. You would need to find the log of the remote task manager to understand what is happening. Thank you~ Xintong Song On Mon, Jun 1, 2020 at 4:57 AM Vijay Balakrishnan <[hidden email]> wrote:
|
Thx a ton, Xintong. I am using this configuration now: taskmanager.numberOfTaskSlots: 14 rest.server.max-content-length: 314572800 taskmanager.network.memory.fraction: 0.45 taskmanager.network.memory.max: 24gb taskmanager.network.memory.min: 500mb akka.ask.timeout: 240s cluster.evenly-spread-out-slots: true akka.tcp.timeout: 240s taskmanager.network.request-backoff.initial: 5000 taskmanager.network.request-backoff.max: 30000 web.timeout:1000000 I still get an error on startup with loading the Flink jar. It resolves itself after failing on the 1st few tries. This is where taskmanager.network.request-backoff.initial: 5000 helped a little bit. Would like to get this Job starting successfully on the 1st try itself.Also attaching screenshot of error on Job failure. Exception: org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition ce6b601e14b959de21d8351a6c5cf70c@1f2cd0d827586a4bc7b6f40ad2609db1 not reachable. at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) ... Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.128.49.96:43060' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197) ... 7 more Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + '/10.128.49.96:43060' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220) ... Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: /10.128.49.96:43060 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ... 6 more Caused by: java.net.ConnectException: Connection timed out ... 10 more TIA, On Sun, May 31, 2020 at 8:08 PM Xintong Song <[hidden email]> wrote:
Screen Shot 2020-06-03 at 7.46.28 AM.png (249K) Download Attachment |
Hi Vijay, From the information you provided (the configurations, error message & screenshot), I'm not able to find out what is the problem and how to resolve it. The error message comes from a healthy task manager, who discovered that another task manager is not responding. We would need to look into the log of the task manager that is not responding to understand what's wrong with it. Thank you~ Xintong Song On Fri, Jun 5, 2020 at 6:06 AM Vijay Balakrishnan <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |