FileInputFormat that processes files in chronological order

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

FileInputFormat that processes files in chronological order

spoganshev
Given a directory with input files of the following format:

/data/shard1/file1.json
/data/shard1/file2.json
/data/shard1/file3.json
/data/shard2/file1.json
/data/shard2/file2.json
/data/shard2/file3.json

Is there a way to make FileInputFormat with parallelism 2 split processing by "shard" (folder) and then process files in chronological order (file1.json, file2.json, file3.json) in each shard? Will I have to implement a custom FilInputFormat for that?

Reply | Threaded
Open this post in threaded view
|

Re: FileInputFormat that processes files in chronological order

Averell
Hi,

Regarding splitting by shards, I believe that you can simply create two
sources, one for each shard. After that, union them together.

Regarding processing files in chronological order, Flink currently reads
files using the files' last-modified-time order (i.e. oldest files will be
processed first). So if your file1.json is older than file2, file2 is older
than file3, then you don't need to do anything.
If your file-times are not in that order, then I think its more complex. But
I am curious about why there are such requirements first. Is this a
streaming problem?

I don't think FileInputFormat has anything to do here. Use that when your
files are in a format not currently supported by Flink.

Regards,
Averell  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FileInputFormat that processes files in chronological order

Fabian Hueske-2
Hi Sergei,

It depends whether you want to process the file with the DataSet (batch) or DataStream (stream) API.
Averell's answer was addressing the DataStream API part.

The DataSet API does not have any built-in support to distinguish files (or file splits) by folders and process them in order.
For the DataSet API, you would need to implement a custom InputFormat (based on FileInputFormat) with a custom InputSplitAssigner implementations.
The InputSplitAssigner would need to assign splits to hosts based on their path and in the correct order.

Best,
Fabian

Am So., 28. Apr. 2019 um 08:48 Uhr schrieb Averell <[hidden email]>:
Hi,

Regarding splitting by shards, I believe that you can simply create two
sources, one for each shard. After that, union them together.

Regarding processing files in chronological order, Flink currently reads
files using the files' last-modified-time order (i.e. oldest files will be
processed first). So if your file1.json is older than file2, file2 is older
than file3, then you don't need to do anything.
If your file-times are not in that order, then I think its more complex. But
I am curious about why there are such requirements first. Is this a
streaming problem?

I don't think FileInputFormat has anything to do here. Use that when your
files are in a format not currently supported by Flink.

Regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FileInputFormat that processes files in chronological order

spoganshev
Why is FileInputFormat#getInputSplitAssigner not configurable though? It
makes sense to let those who use  FileInputFormat set the desired split
assigner (and make LocatableInputSplitAssigner just a default one).



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FileInputFormat that processes files in chronological order

Fabian Hueske-2
Configuring the split assigner wasn't a common requirement so far.
You can just implement your own format extending from FileInputFormat (or any of its subclasses) and override the getInputSplitAssigner() method.

Best, Fabian

Am Mo., 27. Mai 2019 um 15:30 Uhr schrieb spoganshev <[hidden email]>:
Why is FileInputFormat#getInputSplitAssigner not configurable though? It
makes sense to let those who use  FileInputFormat set the desired split
assigner (and make LocatableInputSplitAssigner just a default one).



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FileInputFormat that processes files in chronological order

spoganshev
I've tried that, but the problem is:
- FileInputFormat#getInputSplitAssigner return type is
LocatableInputSplitAssigner
- LocatableInputSplitAssigner is final

Which makes it impossible to override the split assigner unfortunately



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FileInputFormat that processes files in chronological order

Fabian Hueske-2
I see, that's unfortunate.
Both classes are also tagged with @Public, making them unchangeable until Flink 2.0.

Nonetheless, feel free to open a Jira issue to improve the situation for a future release.

Best, Fabian

Am Mo., 27. Mai 2019 um 16:55 Uhr schrieb spoganshev <[hidden email]>:
I've tried that, but the problem is:
- FileInputFormat#getInputSplitAssigner return type is
LocatableInputSplitAssigner
- LocatableInputSplitAssigner is final

Which makes it impossible to override the split assigner unfortunately



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/