Continuous File monitoring not reading nested files

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Continuous File monitoring not reading nested files

Yassine MARZOUGUI
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine
Reply | Threaded
Open this post in threaded view
|

Re: Continuous File monitoring not reading nested files

Yassine MARZOUGUI
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Continuous File monitoring not reading nested files

Aljoscha Krettek
+kostas, who probably has the most experience with this by now. Do you have an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <[hidden email]> wrote:
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Continuous File monitoring not reading nested files

Yassine MARZOUGUI
Hi,

Any updates on this issue? Thank you.

Best,
Yassine


On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <[hidden email]> wrote:
+kostas, who probably has the most experience with this by now. Do you have an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <[hidden email]> wrote:
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine


Reply | Threaded
Open this post in threaded view
|

Re: Continuous File monitoring not reading nested files

Kostas Kloudas
Hi Yassine,

I suspect that the problem is in the way the input format (and not the reader) scans nested files, 
but could you see if in the code that is executed by the tasks, the nestedFileEnumeration parameter is still true?

I am asking in order to pin down if the problem is in the way we ship the code to the tasks or in reading the 
nested files.

Thanks,
Kostas

On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <[hidden email]> wrote:

Hi,

Any updates on this issue? Thank you.

Best,
Yassine


On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <[hidden email]> wrote:
+kostas, who probably has the most experience with this by now. Do you have an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <[hidden email]> wrote:
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "<a href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("<a href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "<a href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring <a href="hdfs://mlxbackoffice/shared/" class="">hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring <a href="hdfs://mlxbackoffice/shared/" class="">hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine



Reply | Threaded
Open this post in threaded view
|

Re: Continuous File monitoring not reading nested files

Yassine MARZOUGUI
Hi Kostas,

I debugged the code and the nestedFileEnumeration parameter was always true during the execution. I noticed however that in the following loop in ContinuousFileMonitoringFunction, for some reason, the fileStatus was null for files in nested folders, and non null for files directly under the parent path, so no splits were forwarded in the case of nested folders.

for(int var5 = 0; var5 < var4; ++var5) {
                FileInputSplit split = var3[var5];
                FileStatus fileStatus = (FileStatus)eligibleFiles.get(split.getPath());
                if(fileStatus != null) {
                    Long modTime = Long.valueOf(fileStatus.getModificationTime());
                    Object splitsToForward = (List)splitsByModTime.get(modTime);
                    if(splitsToForward == null) {
                        splitsToForward = new ArrayList();
                        splitsByModTime.put(modTime, splitsToForward);
                    }

                    ((List)splitsToForward).add(new TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(), split.getPath(), split.getStart(), split.getLength(), split.getHostnames()));
                }
            }

Thanks,
Yassine


2017-01-09 15:04 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Yassine,

I suspect that the problem is in the way the input format (and not the reader) scans nested files, 
but could you see if in the code that is executed by the tasks, the nestedFileEnumeration parameter is still true?

I am asking in order to pin down if the problem is in the way we ship the code to the tasks or in reading the 
nested files.

Thanks,
Kostas

On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <[hidden email]> wrote:

Hi,

Any updates on this issue? Thank you.

Best,
Yassine


On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <[hidden email]> wrote:
+kostas, who probably has the most experience with this by now. Do you have an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <[hidden email]> wrote:
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine




Reply | Threaded
Open this post in threaded view
|

Re: Continuous File monitoring not reading nested files

Yassine MARZOUGUI
Hi,

I found the root cause of the problem : the listEligibleFiles method in ContinuousFileMonitoringFunction scans only the topmost files and ignores the nested files. By fixing that I was able to get the expected output. I created Jira issue: https://issues.apache.org/jira/browse/FLINK-5432.

@Kostas, If you haven't already started working on a fix for this, I would happily contribute a fix for it if you like.

Best,
Yassine

2017-01-09 17:23 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi Kostas,

I debugged the code and the nestedFileEnumeration parameter was always true during the execution. I noticed however that in the following loop in ContinuousFileMonitoringFunction, for some reason, the fileStatus was null for files in nested folders, and non null for files directly under the parent path, so no splits were forwarded in the case of nested folders.

for(int var5 = 0; var5 < var4; ++var5) {
                FileInputSplit split = var3[var5];
                FileStatus fileStatus = (FileStatus)eligibleFiles.get(split.getPath());
                if(fileStatus != null) {
                    Long modTime = Long.valueOf(fileStatus.getModificationTime());
                    Object splitsToForward = (List)splitsByModTime.get(modTime);
                    if(splitsToForward == null) {
                        splitsToForward = new ArrayList();
                        splitsByModTime.put(modTime, splitsToForward);
                    }

                    ((List)splitsToForward).add(new TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(), split.getPath(), split.getStart(), split.getLength(), split.getHostnames()));
                }
            }

Thanks,
Yassine


2017-01-09 15:04 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Yassine,

I suspect that the problem is in the way the input format (and not the reader) scans nested files, 
but could you see if in the code that is executed by the tasks, the nestedFileEnumeration parameter is still true?

I am asking in order to pin down if the problem is in the way we ship the code to the tasks or in reading the 
nested files.

Thanks,
Kostas

On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <[hidden email]> wrote:

Hi,

Any updates on this issue? Thank you.

Best,
Yassine


On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <[hidden email]> wrote:
+kostas, who probably has the most experience with this by now. Do you have an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <[hidden email]> wrote:
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine





Reply | Threaded
Open this post in threaded view
|

Re: Continuous File monitoring not reading nested files

Aljoscha Krettek
Yes, please go ahead with the fix! :-)

(If I'm not mistaken Kostas is working on other stuff right now.)

On Mon, 9 Jan 2017 at 23:19 Yassine MARZOUGUI <[hidden email]> wrote:
Hi,

I found the root cause of the problem : the listEligibleFiles method in ContinuousFileMonitoringFunction scans only the topmost files and ignores the nested files. By fixing that I was able to get the expected output. I created Jira issue: https://issues.apache.org/jira/browse/FLINK-5432.

@Kostas, If you haven't already started working on a fix for this, I would happily contribute a fix for it if you like.

Best,
Yassine

2017-01-09 17:23 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi Kostas,

I debugged the code and the nestedFileEnumeration parameter was always true during the execution. I noticed however that in the following loop in ContinuousFileMonitoringFunction, for some reason, the fileStatus was null for files in nested folders, and non null for files directly under the parent path, so no splits were forwarded in the case of nested folders.

for(int var5 = 0; var5 < var4; ++var5) {
                FileInputSplit split = var3[var5];
                FileStatus fileStatus = (FileStatus)eligibleFiles.get(split.getPath());
                if(fileStatus != null) {
                    Long modTime = Long.valueOf(fileStatus.getModificationTime());
                    Object splitsToForward = (List)splitsByModTime.get(modTime);
                    if(splitsToForward == null) {
                        splitsToForward = new ArrayList();
                        splitsByModTime.put(modTime, splitsToForward);
                    }

                    ((List)splitsToForward).add(new TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(), split.getPath(), split.getStart(), split.getLength(), split.getHostnames()));
                }
            }

Thanks,
Yassine


2017-01-09 15:04 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Yassine,

I suspect that the problem is in the way the input format (and not the reader) scans nested files, 
but could you see if in the code that is executed by the tasks, the nestedFileEnumeration parameter is still true?

I am asking in order to pin down if the problem is in the way we ship the code to the tasks or in reading the 
nested files.

Thanks,
Kostas

On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <[hidden email]> wrote:

Hi,

Any updates on this issue? Thank you.

Best,
Yassine


On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <[hidden email]> wrote:
+kostas, who probably has the most experience with this by now. Do you have an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <[hidden email]> wrote:
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine





Reply | Threaded
Open this post in threaded view
|

Re: Continuous File monitoring not reading nested files

Kostas Kloudas
Aljoscha is right!

Any contribution is more than welcomed.

Kostas

On Jan 10, 2017, at 3:48 PM, Aljoscha Krettek <[hidden email]> wrote:

Yes, please go ahead with the fix! :-)

(If I'm not mistaken Kostas is working on other stuff right now.)

On Mon, 9 Jan 2017 at 23:19 Yassine MARZOUGUI <[hidden email]> wrote:
Hi,

I found the root cause of the problem : the listEligibleFiles method in ContinuousFileMonitoringFunction scans only the topmost files and ignores the nested files. By fixing that I was able to get the expected output. I created Jira issue: https://issues.apache.org/jira/browse/FLINK-5432.

@Kostas, If you haven't already started working on a fix for this, I would happily contribute a fix for it if you like.

Best,
Yassine

2017-01-09 17:23 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi Kostas,

I debugged the code and the nestedFileEnumeration parameter was always true during the execution. I noticed however that in the following loop in ContinuousFileMonitoringFunction, for some reason, the fileStatus was null for files in nested folders, and non null for files directly under the parent path, so no splits were forwarded in the case of nested folders.

for(int var5 = 0; var5 < var4; ++var5) {
                FileInputSplit split = var3[var5];
                FileStatus fileStatus = (FileStatus)eligibleFiles.get(split.getPath());
                if(fileStatus != null) {
                    Long modTime = Long.valueOf(fileStatus.getModificationTime());
                    Object splitsToForward = (List)splitsByModTime.get(modTime);
                    if(splitsToForward == null) {
                        splitsToForward = new ArrayList();
                        splitsByModTime.put(modTime, splitsToForward);
                    }

                    ((List)splitsToForward).add(new TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(), split.getPath(), split.getStart(), split.getLength(), split.getHostnames()));
                }
            }

Thanks,
Yassine


2017-01-09 15:04 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Yassine,

I suspect that the problem is in the way the input format (and not the reader) scans nested files, 
but could you see if in the code that is executed by the tasks, the nestedFileEnumeration parameter is still true?

I am asking in order to pin down if the problem is in the way we ship the code to the tasks or in reading the 
nested files.

Thanks,
Kostas

On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <[hidden email]> wrote:

Hi,

Any updates on this issue? Thank you.

Best,
Yassine


On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <[hidden email]> wrote:
+kostas, who probably has the most experience with this by now. Do you have an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <[hidden email]> wrote:
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine