Given a directory with input files of the following format: /data/shard1/file1.json /data/shard1/file2.json /data/shard1/file3.json /data/shard2/file1.json /data/shard2/file2.json /data/shard2/file3.json Is there a way to make FileInputFormat with parallelism 2 split processing by "shard" (folder) and then process files in chronological order (file1.json, file2.json, file3.json) in each shard? Will I have to implement a custom FilInputFormat for that? |
Hi,
Regarding splitting by shards, I believe that you can simply create two sources, one for each shard. After that, union them together. Regarding processing files in chronological order, Flink currently reads files using the files' last-modified-time order (i.e. oldest files will be processed first). So if your file1.json is older than file2, file2 is older than file3, then you don't need to do anything. If your file-times are not in that order, then I think its more complex. But I am curious about why there are such requirements first. Is this a streaming problem? I don't think FileInputFormat has anything to do here. Use that when your files are in a format not currently supported by Flink. Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Sergei, It depends whether you want to process the file with the DataSet (batch) or DataStream (stream) API. Averell's answer was addressing the DataStream API part. The DataSet API does not have any built-in support to distinguish files (or file splits) by folders and process them in order. For the DataSet API, you would need to implement a custom InputFormat (based on FileInputFormat) with a custom InputSplitAssigner implementations. The InputSplitAssigner would need to assign splits to hosts based on their path and in the correct order. Best, Fabian Am So., 28. Apr. 2019 um 08:48 Uhr schrieb Averell <[hidden email]>: Hi, |
Why is FileInputFormat#getInputSplitAssigner not configurable though? It
makes sense to let those who use FileInputFormat set the desired split assigner (and make LocatableInputSplitAssigner just a default one). -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Configuring the split assigner wasn't a common requirement so far. You can just implement your own format extending from FileInputFormat (or any of its subclasses) and override the getInputSplitAssigner() method. Best, Fabian Am Mo., 27. Mai 2019 um 15:30 Uhr schrieb spoganshev <[hidden email]>: Why is FileInputFormat#getInputSplitAssigner not configurable though? It |
I've tried that, but the problem is:
- FileInputFormat#getInputSplitAssigner return type is LocatableInputSplitAssigner - LocatableInputSplitAssigner is final Which makes it impossible to override the split assigner unfortunately -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I see, that's unfortunate. Both classes are also tagged with @Public, making them unchangeable until Flink 2.0. Nonetheless, feel free to open a Jira issue to improve the situation for a future release. Best, Fabian Am Mo., 27. Mai 2019 um 16:55 Uhr schrieb spoganshev <[hidden email]>: I've tried that, but the problem is: |
Free forum by Nabble | Edit this page |