Getting the name of a file in a directory

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Getting the name of a file in a directory

pietro
I am reading files from a directory with this statement:

val text = env.readFile(new MyInputFormat(), "/path/to/input/dir/")

MyInputFormat extends DelimitedInputFormat which extends DelimitedInputFormat<Record>.

In the output Record, I need to add a field that stores the name (or full path) of the file from which it has been read.

In order to do that, I wrote the following code in MyInputFormat :


private final StringValue fname = new StringValue();
...
@Override
        public Record readRecord(Record reusable, byte[] bytes, int offset, int numBytes) {//throws IOException {
                               
                fname.setValue(this.getFilePath().toString());
                reusable.setField(0, this.fname);
                return reusable;
        }


The code compiles and runs without problems, but in the output I always get:
/path/to/input/dir/
for every record!
What I wanted to have is:
/path/to/input/dir/file1.txt
/path/to/input/dir/file3.txt
...

Can anyone help me?
Reply | Threaded
Open this post in threaded view
|

Re: Getting the name of a file in a directory

Stephan Ewen
Hi Pietro!

The path of an input format is the directory from which all its input is read. It does not contain the specific paths of the contained files.

The specific path is part of the "FileInputSplit", which describes a subtask of work (like a file, or a part of a file).
If you want the path of the file that a record came from, I would suggest the following:

Override the "open(FileInputSplit)" method in your input format. This method is called once for every file (or subregion of a file, usually many megabytes in size). The FileInputSplit that is passed to the open() method contains the path of the file where the next records come from.

You can get the path from the FileInputSplit, store it in a member variable and then use it to attach it to all records.

Greetings,
Stephan

BTW: You are using the Record input format, which is part of the deprecated Record API. We recommend to use the input formats based on https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java which work on other data types than the Record.


On Mon, Mar 2, 2015 at 7:27 PM, pietro <[hidden email]> wrote:
I am reading files from a directory with this statement:

/val text = env.readFile(new MyInputFormat(), "/path/to/input/dir/")/

/MyInputFormat/ extends /DelimitedInputFormat/ which extends
/DelimitedInputFormat<Record>/.

In the output Record, I need to add a field that stores the name (or full
path) of the file from which it has been read.

In order to do that, I wrote the following code in /MyInputFormat/ :

/
private final StringValue fname = new StringValue();
...
@Override
        public Record readRecord(Record reusable, byte[] bytes, int offset, int
numBytes) {//throws IOException {

                fname.setValue(this.getFilePath().toString());
                reusable.setField(0, this.fname);
                return reusable;
        }
/

The code compiles and runs without problems, but in the output I always get:
//path/to/input/dir//
for every record!
What I wanted to have is:
//path/to/input/dir/file1.txt/
//path/to/input/dir/file3.txt/
/.../

Can anyone help me?



--
View this message in context: http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/Getting-the-name-of-a-file-in-a-directory-tp801.html
Sent from the Apache Flink (Incubator) User Mailing List archive. mailing list archive at Nabble.com.