SocketException: Too many open files

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

SocketException: Too many open files

mars
Hi,

  I have a simple Flink job which is reading the data from Kafka topic and
generating minute aggregations and writing them to Elastic Search.

  I am running the Flink Job (Flink Yarn Session) on EMR Cluster and the Job
runs for an hour fine and then it is getting stopped and when i checked the
logs i am seeing the following.

 Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.ChannelException: Unable to
create Channel from class class
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel
        at
org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:46)
        at
org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:309)
        at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:159)
        at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:143)
        at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:127)
        at
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:333)
        at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:272)
        at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:214)
        at
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$22(RestClusterClient.java:629)
        at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594)
        at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedConstructorAccessor22.newInstance(Unknown Source)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at
org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:44)
        ... 17 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ChannelException:
Failed to open a socket.
        at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:70)
        at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:87)
        at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:80)
        ... 21 more
Caused by: java.net.SocketException: Too many open files
        at sun.nio.ch.Net.socket0(Native Method)
        at sun.nio.ch.Net.socket(Net.java:411)
        at sun.nio.ch.Net.socket(Net.java:404)
        at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
        at
sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
        at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:68)
        ... 23 more

Along with calling the Sink Function where i am writing the data to elastic
i am calling the print() on SingleOutputStreamOperator (the Stream that is
returned once i calculate the Aggregation based on Tumbling Window.

And also i am calling the DataStreamUtils.collect() on the above stream to
log out the info in the stream.

These two are only enabled for in DEV Environment.

I have updated the limits.conf and also set the value of file-max
(fs.file-max = 2097152) on the master node as well as on all worker nodes
and still getting the same issue.

Thanks
Sateesh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: SocketException: Too many open files

Ken Krugler
Hi Mars,

A few questions..

1. What version of Flink are you using?

2. Are you using the default ES sink, or did you write your own?

3. What class of EC2 slave are you using?

4. What’s the parallelism of the ES sink?

5. To verify the actual open file limit, you need to…

 * scp your private key to the EMR master
 * ssh onto the EMR master
 * ssh (using the same key) from the master to one of the EMR slaves
 * sudo -u yarn bash -c ‘ulimit -a’

For many classes of EC2 servers, you get 32K max open files.

I haven’t looked at exactly how the ES sink configures things, but the ES REST client (by default) has a connection pool of size 30, and these connections aren’t closed immediately.

Each connection uses 3 file descriptors (one a_inode, two FIFO). So you could get about 100 open files per sink sub-task.

— Ken

PS - calling collect() or print() on a data stream would only make sense if it was tiny. Can you use PrintSinkFunction()?

On Sep 25, 2020, at 2:24 PM, mars <[hidden email]> wrote:

Hi,

 I have a simple Flink job which is reading the data from Kafka topic and
generating minute aggregations and writing them to Elastic Search.

 I am running the Flink Job (Flink Yarn Session) on EMR Cluster and the Job
runs for an hour fine and then it is getting stopped and when i checked the
logs i am seeing the following.

Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.ChannelException: Unable to
create Channel from class class
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel
at
org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:46)
at
org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:309)
at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:159)
at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:143)
at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:127)
at
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:333)
at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:272)
at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:214)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$22(RestClusterClient.java:629)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedConstructorAccessor22.newInstance(Unknown Source)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:44)
... 17 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ChannelException:
Failed to open a socket.
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:70)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:87)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:80)
... 21 more
Caused by: java.net.SocketException: Too many open files
at sun.nio.ch.Net.socket0(Native Method)
at sun.nio.ch.Net.socket(Net.java:411)
at sun.nio.ch.Net.socket(Net.java:404)
at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
at
sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:68)
... 23 more

Along with calling the Sink Function where i am writing the data to elastic
i am calling the print() on SingleOutputStreamOperator (the Stream that is
returned once i calculate the Aggregation based on Tumbling Window.

And also i am calling the DataStreamUtils.collect() on the above stream to
log out the info in the stream.

These two are only enabled for in DEV Environment.

I have updated the limits.conf and also set the value of file-max
(fs.file-max = 2097152) on the master node as well as on all worker nodes
and still getting the same issue.

Thanks
Sateesh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply | Threaded
Open this post in threaded view
|

Re: SocketException: Too many open files

mars
Hi,

 I am using 1.10.0 version of Flink on EMR.

 I am not using the Default Flink Sink. I have a Sink Function on the Stream
and with in the invoke function i am creating a Data Structure (VO) and
putting it in the Map.

 The EMR Step function i am running is. a Spring based FLink Job and i have
a scheduler which runs every min and looks for items in the Map and
generates JSON based in the VO from the Map and send it to Elastic Search
and removes it from the HashMap once it is sent to ES successfully.

 I am using M5.2x large for worker nodes and M5.4xlarge for Master Node

 I have set the ulimit to 500K for all users (*) . Both soft and hard limit
on Master and worker nodes.

 Thanks again for your response.

Sateesh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: SocketException: Too many open files

Arvid Heise-3
Hi Sateesh,

my suspicion would be that your custom Sink Function is leaking connections (which also count for the file limit). Is there a reason that you cannot use the ES connector of Flink?

I might have more ideas when you share your sink function.

Best,

Arvid

On Sun, Sep 27, 2020 at 7:16 PM mars <[hidden email]> wrote:
Hi,

 I am using 1.10.0 version of Flink on EMR.

 I am not using the Default Flink Sink. I have a Sink Function on the Stream
and with in the invoke function i am creating a Data Structure (VO) and
putting it in the Map.

 The EMR Step function i am running is. a Spring based FLink Job and i have
a scheduler which runs every min and looks for items in the Map and
generates JSON based in the VO from the Map and send it to Elastic Search
and removes it from the HashMap once it is sent to ES successfully.

 I am using M5.2x large for worker nodes and M5.4xlarge for Master Node

 I have set the ulimit to 500K for all users (*) . Both soft and hard limit
on Master and worker nodes.

 Thanks again for your response.

Sateesh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng