Insufficient number of network buffers for simple last_value aggregate

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Insufficient number of network buffers for simple last_value aggregate

Schneider, Thilo

Dear list,

 

when trying to compute a simple last_value aggregate, flink fails with an IOException. The query is defined as follows:

 

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(
environment_settings=env_settings)

 

t_env.execute_sql("""
CREATE TABLE key_change_test (
 
id INTval1 STRING,  val2 STRING,  t AS proctime()
) WITH (
  'connector' = 'kafka',
  'format' = 'csv',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = 'localhost:9192',
  'properties.group.id' = 'foo'
)
"""
)

tt = t_env.sql_query(
"SELECT id, LAST_VALUE(val1) AS val1, LAST_VALUE(val2) AS val2 FROM key_change_test GROUP BY id")

t_env.execute_sql("CREATE TABLE debug (id INT, val1 VARCHAR, val2 VARCHAR) WITH ('connector' = 'print')")
tt.execute_insert(
"debug")

 

 

Looking at the logs I get the following error message:

[…]

2020-10-30 07:45:46,474 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, key_change_test]], fields=[id, val1, val2]) (21/88) (02f23a929919c200dbd54b7dcef635e2) switched from DEPLOYING to FAILED.

java.io.IOException: Insufficient number of network buffers: required 89, but only 67 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.

        at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:357) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:332) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:224) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:146) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869) [flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635) [flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141]

[…]

 

What is happening there? For me it seems that flink is requesting an awful lot of resources for a simple query (the kafka topic has only one partition and is used for manual injection only, so no big traffic there).

Can you help me with any way around that problem?

 

Thanks in advance

Thilo


Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
Reply | Threaded
Open this post in threaded view
|

Re: Insufficient number of network buffers for simple last_value aggregate

Xintong Song
Hi Schneider,

The error message suggests that your task managers are not configured with enough network memory. You would need to increase the network memory configuration. See this doc [1] for more details.

On Fri, Oct 30, 2020 at 2:53 PM Schneider, Thilo <[hidden email]> wrote:

Dear list,

 

when trying to compute a simple last_value aggregate, flink fails with an IOException. The query is defined as follows:

 

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(
environment_settings=env_settings)

 

t_env.execute_sql("""
CREATE TABLE key_change_test (
 
id INTval1 STRING,  val2 STRING,  t AS proctime()
) WITH (
  'connector' = 'kafka',
  'format' = 'csv',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = 'localhost:9192',
  'properties.group.id' = 'foo'
)
"""
)

tt = t_env.sql_query(
"SELECT id, LAST_VALUE(val1) AS val1, LAST_VALUE(val2) AS val2 FROM key_change_test GROUP BY id")

t_env.execute_sql("CREATE TABLE debug (id INT, val1 VARCHAR, val2 VARCHAR) WITH ('connector' = 'print')")
tt.execute_insert(
"debug")

 

 

Looking at the logs I get the following error message:

[…]

2020-10-30 07:45:46,474 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, key_change_test]], fields=[id, val1, val2]) (21/88) (02f23a929919c200dbd54b7dcef635e2) switched from DEPLOYING to FAILED.

java.io.IOException: Insufficient number of network buffers: required 89, but only 67 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.

        at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:357) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:332) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:224) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:146) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869) [flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635) [flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141]

[…]

 

What is happening there? For me it seems that flink is requesting an awful lot of resources for a simple query (the kafka topic has only one partition and is used for manual injection only, so no big traffic there).

Can you help me with any way around that problem?

 

Thanks in advance

Thilo


Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
Reply | Threaded
Open this post in threaded view
|

Re: Insufficient number of network buffers for simple last_value aggregate

Arvid Heise-3
Hi Thilo,

the number of required network buffers depends on your data exchanges and parallelism.
For each shuffling data exchange (what you need for group by), you ideally have #slots-per-TM^2 * #TMs * 4 buffers.

So I'm assuming you have 11 machines and 8 slots per machine. Then for best performance, you should give the network stack
8 * 8 * 11 * 4 = 2816 buffers.

It may work with less buffers, but depending on the final topology you may also have more than one shuffling data exchange.

So what can you do:
1) Downscale your job. If your data flow is low, maybe you don't need such a high parallelism.
2) More smaller nodes. If you are running in the cloud, you may use more TMs with a lower number of slots. That decreases the number of network connections per TM significantly.
3) Increase memory size (see Xintong's answer). For larger setups, it's usually enough to just increase taskmanager.memory.network.fraction to 0.2 or 0.3. Be aware that you may need to decrease other memory fractions accordingly.
4) Decrease buffer size. Smaller buffers = more buffers. Set taskmanager.memory.segment-size to a lower value than the default 32kb.



On Fri, Oct 30, 2020 at 8:08 AM Xintong Song <[hidden email]> wrote:
Hi Schneider,

The error message suggests that your task managers are not configured with enough network memory. You would need to increase the network memory configuration. See this doc [1] for more details.

On Fri, Oct 30, 2020 at 2:53 PM Schneider, Thilo <[hidden email]> wrote:

Dear list,

 

when trying to compute a simple last_value aggregate, flink fails with an IOException. The query is defined as follows:

 

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(
environment_settings=env_settings)

 

t_env.execute_sql("""
CREATE TABLE key_change_test (
 
id INTval1 STRING,  val2 STRING,  t AS proctime()
) WITH (
  'connector' = 'kafka',
  'format' = 'csv',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = 'localhost:9192',
  'properties.group.id' = 'foo'
)
"""
)

tt = t_env.sql_query(
"SELECT id, LAST_VALUE(val1) AS val1, LAST_VALUE(val2) AS val2 FROM key_change_test GROUP BY id")

t_env.execute_sql("CREATE TABLE debug (id INT, val1 VARCHAR, val2 VARCHAR) WITH ('connector' = 'print')")
tt.execute_insert(
"debug")

 

 

Looking at the logs I get the following error message:

[…]

2020-10-30 07:45:46,474 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, key_change_test]], fields=[id, val1, val2]) (21/88) (02f23a929919c200dbd54b7dcef635e2) switched from DEPLOYING to FAILED.

java.io.IOException: Insufficient number of network buffers: required 89, but only 67 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.

        at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:357) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:332) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:224) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:146) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869) [flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635) [flink-dist_2.11-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141]

[…]

 

What is happening there? For me it seems that flink is requesting an awful lot of resources for a simple query (the kafka topic has only one partition and is used for manual injection only, so no big traffic there).

Can you help me with any way around that problem?

 

Thanks in advance

Thilo


Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng