Hi,
I have a simple Flink job which is reading the data from Kafka topic and generating minute aggregations and writing them to Elastic Search. I am running the Flink Job (Flink Yarn Session) on EMR Cluster and the Job runs for an hour fine and then it is getting stopped and when i checked the logs i am seeing the following. Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ChannelException: Unable to create Channel from class class org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel at org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:46) at org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:309) at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:159) at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:143) at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:127) at org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:333) at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:272) at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:214) at org.apache.flink.client.program.rest.RestClusterClient.lambda$null$22(RestClusterClient.java:629) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedConstructorAccessor22.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:44) ... 17 more Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ChannelException: Failed to open a socket. at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:70) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:87) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:80) ... 21 more Caused by: java.net.SocketException: Too many open files at sun.nio.ch.Net.socket0(Native Method) at sun.nio.ch.Net.socket(Net.java:411) at sun.nio.ch.Net.socket(Net.java:404) at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102) at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:68) ... 23 more Along with calling the Sink Function where i am writing the data to elastic i am calling the print() on SingleOutputStreamOperator (the Stream that is returned once i calculate the Aggregation based on Tumbling Window. And also i am calling the DataStreamUtils.collect() on the above stream to log out the info in the stream. These two are only enabled for in DEV Environment. I have updated the limits.conf and also set the value of file-max (fs.file-max = 2097152) on the master node as well as on all worker nodes and still getting the same issue. Thanks Sateesh -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Mars,
A few questions.. 1. What version of Flink are you using? 2. Are you using the default ES sink, or did you write your own? 3. What class of EC2 slave are you using? 4. What’s the parallelism of the ES sink? 5. To verify the actual open file limit, you need to… * scp your private key to the EMR master * ssh onto the EMR master * ssh (using the same key) from the master to one of the EMR slaves * sudo -u yarn bash -c ‘ulimit -a’ For many classes of EC2 servers, you get 32K max open files. I haven’t looked at exactly how the ES sink configures things, but the ES REST client (by default) has a connection pool of size 30, and these connections aren’t closed immediately. Each connection uses 3 file descriptors (one a_inode, two FIFO). So you could get about 100 open files per sink sub-task. — Ken PS - calling collect() or print() on a data stream would only make sense if it was tiny. Can you use PrintSinkFunction()?
-------------------------- Ken Krugler custom big data solutions & training Hadoop, Cascading, Cassandra & Solr |
Hi,
I am using 1.10.0 version of Flink on EMR. I am not using the Default Flink Sink. I have a Sink Function on the Stream and with in the invoke function i am creating a Data Structure (VO) and putting it in the Map. The EMR Step function i am running is. a Spring based FLink Job and i have a scheduler which runs every min and looks for items in the Map and generates JSON based in the VO from the Map and send it to Elastic Search and removes it from the HashMap once it is sent to ES successfully. I am using M5.2x large for worker nodes and M5.4xlarge for Master Node I have set the ulimit to 500K for all users (*) . Both soft and hard limit on Master and worker nodes. Thanks again for your response. Sateesh -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Sateesh, my suspicion would be that your custom Sink Function is leaking connections (which also count for the file limit). Is there a reason that you cannot use the ES connector of Flink? I might have more ideas when you share your sink function. Best, Arvid On Sun, Sep 27, 2020 at 7:16 PM mars <[hidden email]> wrote: Hi, -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |