Recursive directory traversal with TextInputFormat

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Recursive directory traversal with TextInputFormat

Lukas Kircher
Hi all,

I am trying to read nested .csv files from a directory and want to switch from a custom SourceFunction I implemented to the TextInputFormat. I have two questions:

1) Somehow only the file in the root directory is processed, nested files are skipped. What am I missing? See the attachment for an SSCCE. I get the same result with flink 1.1.3 no matter if I run it via the IDE or submit the job to the standalone binary. The file input splits are all there, yet they don't seem to be processed.

2) What is the easiest way to read compressed .csv files (.zip)?

Thanks for your help, cheers
Lukas


ReadDirectorySSCCE.java (1K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Recursive directory traversal with TextInputFormat

Stefan Richter
Hi,

I think there is a field in FileInputFormat (which TextInputFormat is subclassing) that could serve your purpose if you override the default:

/**
* The flag to specify whether recursive traversal of the input directory
* structure is enabled.
*/
protected boolean enumerateNestedFiles = false;
As for compression, I think this class also provides a InflaterInputStreamFactory to read compressed data.

Best,
Stefan

Am 07.12.2016 um 12:10 schrieb Lukas Kircher <[hidden email]>:

Hi all,

I am trying to read nested .csv files from a directory and want to switch from a custom SourceFunction I implemented to the TextInputFormat. I have two questions:

1) Somehow only the file in the root directory is processed, nested files are skipped. What am I missing? See the attachment for an SSCCE. I get the same result with flink 1.1.3 no matter if I run it via the IDE or submit the job to the standalone binary. The file input splits are all there, yet they don't seem to be processed.

2) What is the easiest way to read compressed .csv files (.zip)?

Thanks for your help, cheers
Lukas

<ReadDirectorySSCCE.java>

Reply | Threaded
Open this post in threaded view
|

Re: Recursive directory traversal with TextInputFormat

Lukas Kircher
Hi Stefan,

thanks for your answer.

I think there is a field in FileInputFormat (which TextInputFormat is subclassing) that could serve your purpose if you override the default:

That was my first guess as well. I use the basic setup from org.apache.flink.api.java.io.TextInputFormatTest.java and call setNestedFileEnumeration(true), but once the stream is processed only the content of the .csv file in the top-most folder is printed. The example is just a few lines of self-contained code, see below. Does anybody have an idea?

Cheers,
Lukas


import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;

public class ReadDirectorySSCCE {
    public static void main(String[] args) throws Exception {
        // create given dirs and add a .csv file to each one
        String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
        for (String dir: dirs) {
            // create input file
            File tmpDir = new File(dir);
            if (!tmpDir.exists()) {
                tmpDir.mkdirs();
            }
            File tempFile = File.createTempFile("file", ".csv", tmpDir);
            BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
            w.write("content of " + dir + "/file.csv");
            w.close();
            tempFile.deleteOnExit();
        }
        File root = new File("tmp");

        TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString()));
        inputFormat.setNestedFileEnumeration(true);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.createInput(inputFormat).print();
        env.execute();
    }


On 7 Dec 2016, at 17:44, Stefan Richter <[hidden email]> wrote:

Hi,

I think there is a field in FileInputFormat (which TextInputFormat is subclassing) that could serve your purpose if you override the default:

/**
* The flag to specify whether recursive traversal of the input directory
* structure is enabled.
*/
protected boolean enumerateNestedFiles = false;
As for compression, I think this class also provides a InflaterInputStreamFactory to read compressed data.

Best,
Stefan

Am 07.12.2016 um 12:10 schrieb Lukas Kircher <[hidden email]>:

Hi all,

I am trying to read nested .csv files from a directory and want to switch from a custom SourceFunction I implemented to the TextInputFormat. I have two questions:

1) Somehow only the file in the root directory is processed, nested files are skipped. What am I missing? See the attachment for an SSCCE. I get the same result with flink 1.1.3 no matter if I run it via the IDE or submit the job to the standalone binary. The file input splits are all there, yet they don't seem to be processed.

2) What is the easiest way to read compressed .csv files (.zip)?

Thanks for your help, cheers
Lukas

<ReadDirectorySSCCE.java>


Reply | Threaded
Open this post in threaded view
|

Re: Recursive directory traversal with TextInputFormat

Ufuk Celebi
Looping in Kostas who recently worked on the continuous file inputs.

@Kostas: do you have an idea what's happening here?

– Ufuk

On 8 December 2016 at 08:43:32, Lukas Kircher ([hidden email]) wrote:

> Hi Stefan,
>  
> thanks for your answer.
>  
> > I think there is a field in FileInputFormat (which TextInputFormat is subclassing)  
> that could serve your purpose if you override the default:
>  
> That was my first guess as well. I use the basic setup from org.apache.flink.api.java.io.TextInputFormatTest.java  
> and call setNestedFileEnumeration(true), but once the stream is processed only the  
> content of the .csv file in the top-most folder is printed. The example is just a few lines  
> of self-contained code, see below. Does anybody have an idea?
>  
> Cheers,
> Lukas
>  
>  
> import org.apache.flink.api.java.io.TextInputFormat;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
>  
> import java.io.BufferedWriter;
> import java.io.File;
> import java.io.FileWriter;
>  
> public class ReadDirectorySSCCE {
> public static void main(String[] args) throws Exception {
> // create given dirs and add a .csv file to each one
> String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
> for (String dir: dirs) {
> // create input file
> File tmpDir = new File(dir);
> if (!tmpDir.exists()) {
> tmpDir.mkdirs();
> }
> File tempFile = File.createTempFile("file", ".csv", tmpDir);
> BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
> w.write("content of " + dir + "/file.csv");
> w.close();
> tempFile.deleteOnExit();
> }
> File root = new File("tmp");
>  
> TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString()));  
> inputFormat.setNestedFileEnumeration(true);
>  
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
> env.createInput(inputFormat).print();
> env.execute();
> }
>  
>  
> > On 7 Dec 2016, at 17:44, Stefan Richter wrote:
> >
> > Hi,
> >
> > I think there is a field in FileInputFormat (which TextInputFormat is subclassing)  
> that could serve your purpose if you override the default:
> >
> > /**
> > * The flag to specify whether recursive traversal of the input directory
> > * structure is enabled.
> > */
> > protected boolean enumerateNestedFiles = false;
> > As for compression, I think this class also provides a InflaterInputStreamFactory  
> to read compressed data.
> >
> > Best,
> > Stefan
> >
> >> Am 07.12.2016 um 12:10 schrieb Lukas Kircher >:  
> >>
> >> Hi all,
> >>
> >> I am trying to read nested .csv files from a directory and want to switch from a custom  
> SourceFunction I implemented to the TextInputFormat. I have two questions:
> >>
> >> 1) Somehow only the file in the root directory is processed, nested files are skipped.  
> What am I missing? See the attachment for an SSCCE. I get the same result with flink 1.1.3  
> no matter if I run it via the IDE or submit the job to the standalone binary. The file input  
> splits are all there, yet they don't seem to be processed.
> >>
> >> 2) What is the easiest way to read compressed .csv files (.zip)?
> >>
> >> Thanks for your help, cheers
> >> Lukas
> >>
> >>  
> >
>  
>