Dear list, when trying to compute a simple last_value aggregate, flink fails with an IOException. The query is defined as follows: from
pyflink.table
import
EnvironmentSettings,
StreamTableEnvironment t_env.execute_sql(""" Looking at the logs I get the following error message: […] 2020-10-30 07:45:46,474 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, key_change_test]], fields=[id, val1, val2]) (21/88) (02f23a929919c200dbd54b7dcef635e2)
switched from DEPLOYING to FAILED. java.io.IOException: Insufficient number of network buffers: required 89, but only 67 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting
the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:357) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:332) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:224) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:146) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141] […] What is happening there? For me it seems that flink is requesting an awful lot of resources for a simple query (the kafka topic has only one partition and is used for manual injection only, so no big traffic there).
Can you help me with any way around that problem? Thanks in advance Thilo Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang |
Hi Schneider, The error message suggests that your task managers are not configured with enough network memory. You would need to increase the network memory configuration. See this doc [1] for more details. On Fri, Oct 30, 2020 at 2:53 PM Schneider, Thilo <[hidden email]> wrote:
|
Hi Thilo, the number of required network buffers depends on your data exchanges and parallelism. For each shuffling data exchange (what you need for group by), you ideally have #slots-per-TM^2 * #TMs * 4 buffers. So I'm assuming you have 11 machines and 8 slots per machine. Then for best performance, you should give the network stack 8 * 8 * 11 * 4 = 2816 buffers. It may work with less buffers, but depending on the final topology you may also have more than one shuffling data exchange. So what can you do: 1) Downscale your job. If your data flow is low, maybe you don't need such a high parallelism. 2) More smaller nodes. If you are running in the cloud, you may use more TMs with a lower number of slots. That decreases the number of network connections per TM significantly. 3) Increase memory size (see Xintong's answer). For larger setups, it's usually enough to just increase taskmanager.memory.network.fraction to 0.2 or 0.3. Be aware that you may need to decrease other memory fractions accordingly. 4) Decrease buffer size. Smaller buffers = more buffers. Set taskmanager.memory.segment-size to a lower value than the default 32kb. On Fri, Oct 30, 2020 at 8:08 AM Xintong Song <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |