Hi all,
I'm reading a large number of small files from HDFS in batch mode (about 20 directories, each directory contains about 3000 files, using recursive.file.enumeration=true), and each time, at about 200 GB of received data, my job fails with the following exception: java.io.IOException: Error opening the Input Split hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313 file=/filepath/filename.csv.gz at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:693) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Unknown Source) Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313 file=/filepath/filename.csv.gz at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:984) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:642) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735) at java.io.FilterInputStream.read(Unknown Source) at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59) at java.util.zip.CheckedInputStream.read(Unknown Source) at java.util.zip.GZIPInputStream.readUByte(Unknown Source) at java.util.zip.GZIPInputStream.readUShort(Unknown Source) at java.util.zip.GZIPInputStream.readHeader(Unknown Source) at java.util.zip.GZIPInputStream.<init>(Unknown Source) at java.util.zip.GZIPInputStream.<init>(Unknown Source) at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44) at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31) at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717) at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689) ... 5 more I checked the file each time and it exists and is healthy. Looking at the taskmanager logs, I found the following exceptions which suggests it is running out of connections: 2016-10-15 18:20:27,034 WARN org.apache.hadoop.hdfs.BlockReaderFactory - I/O error constructing remote block reader. java.net.SocketException: No buffer space available (maximum connections reached?): connect at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Unknown Source) at sun.nio.ch.Net.connect(Unknown Source) at sun.nio.ch.SocketChannelImpl.connect(Unknown Source) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531) at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436) at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777) at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694) at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:673) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735) at java.io.FilterInputStream.read(Unknown Source) at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59) at java.util.zip.CheckedInputStream.read(Unknown Source) at java.util.zip.GZIPInputStream.readUByte(Unknown Source) at java.util.zip.GZIPInputStream.readUShort(Unknown Source) at java.util.zip.GZIPInputStream.readHeader(Unknown Source) at java.util.zip.GZIPInputStream.<init>(Unknown Source) at java.util.zip.GZIPInputStream.<init>(Unknown Source) at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44) at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31) at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717) at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424) at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Unknown Source) 2016-10-15 18:20:27,034 WARN org.apache.hadoop.hdfs.DFSClient - Failed to connect to /x.x.x.x:50010 for block, add to deadNodes and continue. java.net.SocketException: No buffer space available (maximum connections reached?): connect java.net.SocketException: No buffer space available (maximum connections reached?): connect at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Unknown Source) at sun.nio.ch.Net.connect(Unknown Source) at sun.nio.ch.SocketChannelImpl.connect(Unknown Source) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531) at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436) at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777) at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694) at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:673) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735) at java.io.FilterInputStream.read(Unknown Source) at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59) at java.util.zip.CheckedInputStream.read(Unknown Source) at java.util.zip.GZIPInputStream.readUByte(Unknown Source) at java.util.zip.GZIPInputStream.readUShort(Unknown Source) at java.util.zip.GZIPInputStream.readHeader(Unknown Source) at java.util.zip.GZIPInputStream.<init>(Unknown Source) at java.util.zip.GZIPInputStream.<init>(Unknown Source) at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44) at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31) at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717) at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424) at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Unknown Source) I inspected the open connections, and found that a very large number of connections are opened by the job and stuck on the CLOSE_WAIT status, which I guess exhausted the ephemeral port space after some time. I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and using a prallelism of 8. I got the same exception even with a job paralellism set to 1. The same exception happened after upgrading to Flink 1.1.3 too. Any idea what could be the root cause of the problem and how to solve it? Thank you. Best, Yassine |
Hi!
Looks to me that this is the following problem: The Decompression Streams did not properly forward the "close()" calls. It is in the lastest 1.2-SNAPSHOT, but did not make it into version 1.1.3. The fix is in that pull request: https://github.com/apache/flink/pull/2581 I have pushed the fix into the latest 1.1-SNAPSHOT branch. If you get the code via "git clone -b release-1.1 https://github.com/apache/flink.git" you will get the code that is the same as the 1.1.3 release, plus the patch to this problem. Greetings, Stephan On Sat, Oct 15, 2016 at 10:11 PM, Yassine MARZOUGUI <[hidden email]> wrote:
|
That solved my problem, Thank you! Best, Yassine 2016-10-16 19:18 GMT+02:00 Stephan Ewen <[hidden email]>:
|
Happy to hear it! On Mon, Oct 17, 2016 at 9:31 AM, Yassine MARZOUGUI <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |