Recursive directory reading error

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Recursive directory reading error

Flavio Pompermaier
Hi to all,

I'm trying to recursively read a directory but it seems that the  totalLength value in the FileInputformat.createInputSplits() is not computed correctly..

I have a files organized as:

/tmp/myDir/A/B/cunk-1.txt
/tmp/myDir/A/B/cunk-2.txt
 ..

If I try to do the following:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
env.readTextFile("file:////tmp/myDir)).withParameters(parameters).print();

I get:

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Java heap space
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:515)
... 19 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2219)
at java.util.ArrayList.grow(ArrayList.java:242)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
at java.util.ArrayList.add(ArrayList.java:440)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:503)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)

Am I doing something wrong or is it a bug?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Recursive directory reading error

rmetzger0
Hi Flavio,

how many files are in the directory?
You can count with "find /tmp/myDir | wc -l"

Flink running out of memory while creating input splits indicates to me that there are a lot of files in there.

On Tue, May 26, 2015 at 2:10 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm trying to recursively read a directory but it seems that the  totalLength value in the FileInputformat.createInputSplits() is not computed correctly..

I have a files organized as:

/tmp/myDir/A/B/cunk-1.txt
/tmp/myDir/A/B/cunk-2.txt
 ..

If I try to do the following:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
env.readTextFile("file:////tmp/myDir)).withParameters(parameters).print();

I get:

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Java heap space
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:515)
... 19 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2219)
at java.util.ArrayList.grow(ArrayList.java:242)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
at java.util.ArrayList.add(ArrayList.java:440)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:503)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)

Am I doing something wrong or is it a bug?

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Recursive directory reading error

Flavio Pompermaier
I have 10 files..I debugged the code and it seems that there's a loop in the FileInputFormat when files are nested far away from the root directory of the scan

On Tue, May 26, 2015 at 2:14 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

how many files are in the directory?
You can count with "find /tmp/myDir | wc -l"

Flink running out of memory while creating input splits indicates to me that there are a lot of files in there.

On Tue, May 26, 2015 at 2:10 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm trying to recursively read a directory but it seems that the  totalLength value in the FileInputformat.createInputSplits() is not computed correctly..

I have a files organized as:

/tmp/myDir/A/B/cunk-1.txt
/tmp/myDir/A/B/cunk-2.txt
 ..

If I try to do the following:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
env.readTextFile("file:////tmp/myDir)).withParameters(parameters).print();

I get:

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Java heap space
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:515)
... 19 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2219)
at java.util.ArrayList.grow(ArrayList.java:242)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
at java.util.ArrayList.add(ArrayList.java:440)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:503)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)

Am I doing something wrong or is it a bug?

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: Recursive directory reading error

Maximilian Michels
Yes, there is a loop to recursively search for files in directory but that should be ok. The code fails when adding a new InputSplit to an ArrayList. This is a standard operation.

Oh, I think I found a bug in `addNestedFiles`. It does not pick up the length of the recursively found files in line 546. That can result in a returned size of 0 which causes infinite InputSplits to be created and added to the aforementioned ArrayList. Can you change

addNestedFiles(dir.getPath(), files, length, logExcludedFiles);
to

length += addNestedFiles(dir.getPath(), files, length, logExcludedFiles);

?


On Tue, May 26, 2015 at 2:21 PM, Flavio Pompermaier <[hidden email]> wrote:
I have 10 files..I debugged the code and it seems that there's a loop in the FileInputFormat when files are nested far away from the root directory of the scan

On Tue, May 26, 2015 at 2:14 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

how many files are in the directory?
You can count with "find /tmp/myDir | wc -l"

Flink running out of memory while creating input splits indicates to me that there are a lot of files in there.

On Tue, May 26, 2015 at 2:10 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm trying to recursively read a directory but it seems that the  totalLength value in the FileInputformat.createInputSplits() is not computed correctly..

I have a files organized as:

/tmp/myDir/A/B/cunk-1.txt
/tmp/myDir/A/B/cunk-2.txt
 ..

If I try to do the following:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
env.readTextFile("file:////tmp/myDir)).withParameters(parameters).print();

I get:

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Java heap space
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:515)
... 19 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2219)
at java.util.ArrayList.grow(ArrayList.java:242)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
at java.util.ArrayList.add(ArrayList.java:440)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:503)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)

Am I doing something wrong or is it a bug?

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: Recursive directory reading error

Flavio Pompermaier
Yeap, that definitively solves the problem! Could you make a PR to fix that..?

Thank you in advance,
Flavio

On Tue, May 26, 2015 at 3:20 PM, Maximilian Michels <[hidden email]> wrote:
Yes, there is a loop to recursively search for files in directory but that should be ok. The code fails when adding a new InputSplit to an ArrayList. This is a standard operation.

Oh, I think I found a bug in `addNestedFiles`. It does not pick up the length of the recursively found files in line 546. That can result in a returned size of 0 which causes infinite InputSplits to be created and added to the aforementioned ArrayList. Can you change

addNestedFiles(dir.getPath(), files, length, logExcludedFiles);
to

length += addNestedFiles(dir.getPath(), files, length, logExcludedFiles);

?


On Tue, May 26, 2015 at 2:21 PM, Flavio Pompermaier <[hidden email]> wrote:
I have 10 files..I debugged the code and it seems that there's a loop in the FileInputFormat when files are nested far away from the root directory of the scan

On Tue, May 26, 2015 at 2:14 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

how many files are in the directory?
You can count with "find /tmp/myDir | wc -l"

Flink running out of memory while creating input splits indicates to me that there are a lot of files in there.

On Tue, May 26, 2015 at 2:10 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm trying to recursively read a directory but it seems that the  totalLength value in the FileInputformat.createInputSplits() is not computed correctly..

I have a files organized as:

/tmp/myDir/A/B/cunk-1.txt
/tmp/myDir/A/B/cunk-2.txt
 ..

If I try to do the following:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
env.readTextFile("file:////tmp/myDir)).withParameters(parameters).print();

I get:

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Java heap space
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:515)
... 19 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2219)
at java.util.ArrayList.grow(ArrayList.java:242)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
at java.util.ArrayList.add(ArrayList.java:440)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:503)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)

Am I doing something wrong or is it a bug?

Best,
Flavio





Reply | Threaded
Open this post in threaded view
|

Re: Recursive directory reading error

Maximilian Michels
Pushed a fix to the master and will open a PR to programmatically fix this.

On Tue, May 26, 2015 at 4:22 PM, Flavio Pompermaier <[hidden email]> wrote:
Yeap, that definitively solves the problem! Could you make a PR to fix that..?

Thank you in advance,
Flavio

On Tue, May 26, 2015 at 3:20 PM, Maximilian Michels <[hidden email]> wrote:
Yes, there is a loop to recursively search for files in directory but that should be ok. The code fails when adding a new InputSplit to an ArrayList. This is a standard operation.

Oh, I think I found a bug in `addNestedFiles`. It does not pick up the length of the recursively found files in line 546. That can result in a returned size of 0 which causes infinite InputSplits to be created and added to the aforementioned ArrayList. Can you change

addNestedFiles(dir.getPath(), files, length, logExcludedFiles);
to

length += addNestedFiles(dir.getPath(), files, length, logExcludedFiles);

?


On Tue, May 26, 2015 at 2:21 PM, Flavio Pompermaier <[hidden email]> wrote:
I have 10 files..I debugged the code and it seems that there's a loop in the FileInputFormat when files are nested far away from the root directory of the scan

On Tue, May 26, 2015 at 2:14 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

how many files are in the directory?
You can count with "find /tmp/myDir | wc -l"

Flink running out of memory while creating input splits indicates to me that there are a lot of files in there.

On Tue, May 26, 2015 at 2:10 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm trying to recursively read a directory but it seems that the  totalLength value in the FileInputformat.createInputSplits() is not computed correctly..

I have a files organized as:

/tmp/myDir/A/B/cunk-1.txt
/tmp/myDir/A/B/cunk-2.txt
 ..

If I try to do the following:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
env.readTextFile("file:////tmp/myDir)).withParameters(parameters).print();

I get:

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Java heap space
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:515)
... 19 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2219)
at java.util.ArrayList.grow(ArrayList.java:242)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
at java.util.ArrayList.add(ArrayList.java:440)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:503)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)

Am I doing something wrong or is it a bug?

Best,
Flavio