Fwd: Continuous File monitoring not reading nested files

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Continuous File monitoring not reading nested files

Lukas Kircher
Hi all,

this is probably related to the problem that I reported in December. In case it helps you can find a self contained example below. I haven't looked deeply into the problem but it seems like the correct file splits are determined but somehow not processed. If I read from HDFS nested files are skipped as well which is a real problem for me at the moment.

Cheers,
Lukas

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;

public class ReadDirectorySSCCE {
public static void main(String[] args) throws Exception {
// create given dirs and add a .csv file to each one
String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
for (String dir: dirs) {
// create input file
File tmpDir = new File(dir);
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
File tempFile = File.createTempFile("file", ".csv", tmpDir);
BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
w.write("content of " + dir + "/file.csv");
w.close();
tempFile.deleteOnExit();
}
File root = new File("tmp");

TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString()));
inputFormat.setNestedFileEnumeration(true);

inputFormat.configure(new Configuration());

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.createInput(inputFormat).print();
env.execute();
}
}

On 9 Jan 2017, at 12:56, Yassine MARZOUGUI <[hidden email]> wrote:

Hi,

Any updates on this issue? Thank you.

Best,
Yassine


On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <[hidden email]> wrote:
+kostas, who probably has the most experience with this by now. Do you have an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <[hidden email]> wrote:
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "<a href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("<a href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "<a href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring <a href="hdfs://mlxbackoffice/shared/" class="">hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring <a href="hdfs://mlxbackoffice/shared/" class="">hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine




Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Continuous File monitoring not reading nested files

Timo Walther
Hi Lukas,

have you tried to set the parameter " recursive.file.enumeration" to true?

// create a configuration object
Configuration parameters = new Configuration();

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true);
If this also does not work, I think this could be a bug. You can open an issue for it and attach your sample code.

Timo


Am 09/01/17 um 13:47 schrieb Lukas Kircher:
Hi all,

this is probably related to the problem that I reported in December. In case it helps you can find a self contained example below. I haven't looked deeply into the problem but it seems like the correct file splits are determined but somehow not processed. If I read from HDFS nested files are skipped as well which is a real problem for me at the moment.

Cheers,
Lukas

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;

public class ReadDirectorySSCCE {
    public static void main(String[] args) throws Exception {
        // create given dirs and add a .csv file to each one
        String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
        for (String dir: dirs) {
            // create input file
            File tmpDir = new File(dir);
            if (!tmpDir.exists()) {
                tmpDir.mkdirs();
            }
            File tempFile = File.createTempFile("file", ".csv", tmpDir);
            BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
            w.write("content of " + dir + "/file.csv");
            w.close();
            tempFile.deleteOnExit();
        }
        File root = new File("tmp");

        TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString()));
        inputFormat.setNestedFileEnumeration(true);

        inputFormat.configure(new Configuration());

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.createInput(inputFormat).print();
        env.execute();
    }
}

On 9 Jan 2017, at 12:56, Yassine MARZOUGUI <[hidden email]> wrote:

Hi,

Any updates on this issue? Thank you.

Best,
Yassine


On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <[hidden email]> wrote:
+kostas, who probably has the most experience with this by now. Do you have an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <[hidden email]> wrote:
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "<a moz-do-not-send="true" href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("<a moz-do-not-send="true" href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "<a moz-do-not-send="true" href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring <a moz-do-not-send="true" href="hdfs://mlxbackoffice/shared/" class="">hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring <a moz-do-not-send="true" href="hdfs://mlxbackoffice/shared/" class="">hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine





Reply | Threaded
Open this post in threaded view
|

Re: Continuous File monitoring not reading nested files

Kostas Kloudas
Hi Lukas,

Are you sure that the tempFile.deleteOnExit() does not remove the files before the test completes.
I am just asking to be sure.

Also from the code, I suppose that you run it locally. I suspect that the problem is in the way the input
format scans nested files, but could you see if in the code that is executed by the tasks, the 
nestedFileEnumeration parameter is still true?

I am asking in order to pin down if the problem is in the way we ship the code to the tasks or in reading the 
nested files.

Thanks,
Kostas

On Jan 9, 2017, at 2:24 PM, Timo Walther <[hidden email]> wrote:

Hi Lukas,

have you tried to set the parameter " recursive.file.enumeration" to true?

// create a configuration object
Configuration parameters = new Configuration();

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true);
If this also does not work, I think this could be a bug. You can open an issue for it and attach your sample code.

Timo


Am 09/01/17 um 13:47 schrieb Lukas Kircher:
Hi all,

this is probably related to the problem that I reported in December. In case it helps you can find a self contained example below. I haven't looked deeply into the problem but it seems like the correct file splits are determined but somehow not processed. If I read from HDFS nested files are skipped as well which is a real problem for me at the moment.

Cheers,
Lukas

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;

public class ReadDirectorySSCCE {
    public static void main(String[] args) throws Exception {
        // create given dirs and add a .csv file to each one
        String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
        for (String dir: dirs) {
            // create input file
            File tmpDir = new File(dir);
            if (!tmpDir.exists()) {
                tmpDir.mkdirs();
            }
            File tempFile = File.createTempFile("file", ".csv", tmpDir);
            BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
            w.write("content of " + dir + "/file.csv");
            w.close();
            tempFile.deleteOnExit();
        }
        File root = new File("tmp");

        TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString()));
        inputFormat.setNestedFileEnumeration(true);

        inputFormat.configure(new Configuration());

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.createInput(inputFormat).print();
        env.execute();
    }
}

On 9 Jan 2017, at 12:56, Yassine MARZOUGUI <[hidden email]> wrote:

Hi,

Any updates on this issue? Thank you.

Best,
Yassine


On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <[hidden email]> wrote:
+kostas, who probably has the most experience with this by now. Do you have an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <[hidden email]> wrote:
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "<a moz-do-not-send="true" href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("<a moz-do-not-send="true" href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "<a moz-do-not-send="true" href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring <a moz-do-not-send="true" href="hdfs://mlxbackoffice/shared/" class="">hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring <a moz-do-not-send="true" href="hdfs://mlxbackoffice/shared/" class="">hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine






Reply | Threaded
Open this post in threaded view
|

Re: Continuous File monitoring not reading nested files

Lukas Kircher
Thanks for your suggestions:

@Timo
1) Regarding the recursive.file.enumeration parameter: I think what counts here is the enumerateNestedFiles parameter in FileInputFormat.java. Calling the setter for enumerateNestedFiles is expected to overwrite recursive.file.enumeration. Not literally - I think recursive.file.enumeration is simply to be ignored here. This is tested in TextInputFormatTest.testNestedFileRead().

@Kostas
2) tempFile.deleteOnExit(): If I remove the line I get the same result. Only the content of the file in the top-level tmp directory is printed. I derived the SSCCE from my real use-case where I encountered the problem originally. I don't mess with the input files there in any way.

3) The given example is run locally. In TextInputFormat.readRecord(String, byte[], int, int) the nestedFileEnumeration parameter is true during execution. Is this what you meant?

Cheers,
Lukas

On 9 Jan 2017, at 14:56, Kostas Kloudas <[hidden email]> wrote:

Hi Lukas,

Are you sure that the tempFile.deleteOnExit() does not remove the files before the test completes.
I am just asking to be sure.

Also from the code, I suppose that you run it locally. I suspect that the problem is in the way the input
format scans nested files, but could you see if in the code that is executed by the tasks, the 
nestedFileEnumeration parameter is still true?

I am asking in order to pin down if the problem is in the way we ship the code to the tasks or in reading the 
nested files.

Thanks,
Kostas

On Jan 9, 2017, at 2:24 PM, Timo Walther <[hidden email]> wrote:

Hi Lukas,

have you tried to set the parameter " recursive.file.enumeration" to true?

// create a configuration object
Configuration parameters = new Configuration();

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true);
If this also does not work, I think this could be a bug. You can open an issue for it and attach your sample code.

Timo


Am 09/01/17 um 13:47 schrieb Lukas Kircher:
Hi all,

this is probably related to the problem that I reported in December. In case it helps you can find a self contained example below. I haven't looked deeply into the problem but it seems like the correct file splits are determined but somehow not processed. If I read from HDFS nested files are skipped as well which is a real problem for me at the moment.

Cheers,
Lukas

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;

public class ReadDirectorySSCCE {
    public static void main(String[] args) throws Exception {
        // create given dirs and add a .csv file to each one
        String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
        for (String dir: dirs) {
            // create input file
            File tmpDir = new File(dir);
            if (!tmpDir.exists()) {
                tmpDir.mkdirs();
            }
            File tempFile = File.createTempFile("file", ".csv", tmpDir);
            BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
            w.write("content of " + dir + "/file.csv");
            w.close();
            tempFile.deleteOnExit();
        }
        File root = new File("tmp");

        TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString()));
        inputFormat.setNestedFileEnumeration(true);

        inputFormat.configure(new Configuration());

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.createInput(inputFormat).print();
        env.execute();
    }
}

On 9 Jan 2017, at 12:56, Yassine MARZOUGUI <[hidden email]> wrote:

Hi,

Any updates on this issue? Thank you.

Best,
Yassine


On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <[hidden email]> wrote:
+kostas, who probably has the most experience with this by now. Do you have an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <[hidden email]> wrote:
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "<a moz-do-not-send="true" href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("<a moz-do-not-send="true" href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "<a moz-do-not-send="true" href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring <a moz-do-not-send="true" href="hdfs://mlxbackoffice/shared/" class="">hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring <a moz-do-not-send="true" href="hdfs://mlxbackoffice/shared/" class="">hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine







Reply | Threaded
Open this post in threaded view
|

Re: Continuous File monitoring not reading nested files

Kostas Kloudas
Yes, thanks for the effort. I will look into it.

Kostas

On Jan 9, 2017, at 4:24 PM, Lukas Kircher <[hidden email]> wrote:

Thanks for your suggestions:

@Timo
1) Regarding the recursive.file.enumeration parameter: I think what counts here is the enumerateNestedFiles parameter in FileInputFormat.java. Calling the setter for enumerateNestedFiles is expected to overwrite recursive.file.enumeration. Not literally - I think recursive.file.enumeration is simply to be ignored here. This is tested in TextInputFormatTest.testNestedFileRead().

@Kostas
2) tempFile.deleteOnExit(): If I remove the line I get the same result. Only the content of the file in the top-level tmp directory is printed. I derived the SSCCE from my real use-case where I encountered the problem originally. I don't mess with the input files there in any way.

3) The given example is run locally. In TextInputFormat.readRecord(String, byte[], int, int) the nestedFileEnumeration parameter is true during execution. Is this what you meant?

Cheers,
Lukas

On 9 Jan 2017, at 14:56, Kostas Kloudas <[hidden email]> wrote:

Hi Lukas,

Are you sure that the tempFile.deleteOnExit() does not remove the files before the test completes.
I am just asking to be sure.

Also from the code, I suppose that you run it locally. I suspect that the problem is in the way the input
format scans nested files, but could you see if in the code that is executed by the tasks, the 
nestedFileEnumeration parameter is still true?

I am asking in order to pin down if the problem is in the way we ship the code to the tasks or in reading the 
nested files.

Thanks,
Kostas

On Jan 9, 2017, at 2:24 PM, Timo Walther <[hidden email]> wrote:

Hi Lukas,

have you tried to set the parameter " recursive.file.enumeration" to true?

// create a configuration object
Configuration parameters = new Configuration();

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true);
If this also does not work, I think this could be a bug. You can open an issue for it and attach your sample code.

Timo


Am 09/01/17 um 13:47 schrieb Lukas Kircher:
Hi all,

this is probably related to the problem that I reported in December. In case it helps you can find a self contained example below. I haven't looked deeply into the problem but it seems like the correct file splits are determined but somehow not processed. If I read from HDFS nested files are skipped as well which is a real problem for me at the moment.

Cheers,
Lukas

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;

public class ReadDirectorySSCCE {
    public static void main(String[] args) throws Exception {
        // create given dirs and add a .csv file to each one
        String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
        for (String dir: dirs) {
            // create input file
            File tmpDir = new File(dir);
            if (!tmpDir.exists()) {
                tmpDir.mkdirs();
            }
            File tempFile = File.createTempFile("file", ".csv", tmpDir);
            BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
            w.write("content of " + dir + "/file.csv");
            w.close();
            tempFile.deleteOnExit();
        }
        File root = new File("tmp");

        TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString()));
        inputFormat.setNestedFileEnumeration(true);

        inputFormat.configure(new Configuration());

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.createInput(inputFormat).print();
        env.execute();
    }
}

On 9 Jan 2017, at 12:56, Yassine MARZOUGUI <[hidden email]> wrote:

Hi,

Any updates on this issue? Thank you.

Best,
Yassine


On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <[hidden email]> wrote:
+kostas, who probably has the most experience with this by now. Do you have an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <[hidden email]> wrote:
Looks like this is not specific to the continuous file monitoring, I'm having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "<a moz-do-not-send="true" href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir", FileProcessingMode.PROCESS_ONCE, -1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

I'm using the following code to continuously process files from a directory "mydir".

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new Path("<a moz-do-not-send="true" href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "<a moz-do-not-send="true" href="hdfs:///shared/mydir" class="">hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt", its contents are not printed. If I add the same file directly under "mydir",  its contents are correctly printed. After that the logs will show the following :

10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring <a moz-do-not-send="true" href="hdfs://mlxbackoffice/shared/" class="">hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction  - Ignoring <a moz-do-not-send="true" href="hdfs://mlxbackoffice/shared/" class="">hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
Thank you.

Best,
Yassine