Hi all,
I am trying to read nested .csv files from a directory and want to switch from a custom SourceFunction I implemented to the TextInputFormat. I have two questions: 1) Somehow only the file in the root directory is processed, nested files are skipped. What am I missing? See the attachment for an SSCCE. I get the same result with flink 1.1.3 no matter if I run it via the IDE or submit the job to the standalone binary. The file input splits are all there, yet they don't seem to be processed. 2) What is the easiest way to read compressed .csv files (.zip)? Thanks for your help, cheers Lukas ReadDirectorySSCCE.java (1K) Download Attachment |
Hi,
I think there is a field in FileInputFormat (which TextInputFormat is subclassing) that could serve your purpose if you override the default: /** As for compression, I think this class also provides a InflaterInputStreamFactory to read compressed data. Best, Stefan
|
Hi Stefan, thanks for your answer.
That was my first guess as well. I use the basic setup from org.apache.flink.api.java.io.TextInputFormatTest.java and call setNestedFileEnumeration(true), but once the stream is processed only the content of the .csv file in the top-most folder is printed. The example is just a few lines of self-contained code, see below. Does anybody have an idea? Cheers, Lukas import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; public class ReadDirectorySSCCE { public static void main(String[] args) throws Exception { // create given dirs and add a .csv file to each one String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"}; for (String dir: dirs) { // create input file File tmpDir = new File(dir); if (!tmpDir.exists()) { tmpDir.mkdirs(); } File tempFile = File.createTempFile("file", ".csv", tmpDir); BufferedWriter w = new BufferedWriter(new FileWriter(tempFile)); w.write("content of " + dir + "/file.csv"); w.close(); tempFile.deleteOnExit(); } File root = new File("tmp"); TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString())); inputFormat.setNestedFileEnumeration(true); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.createInput(inputFormat).print(); env.execute(); }
|
Looping in Kostas who recently worked on the continuous file inputs.
@Kostas: do you have an idea what's happening here? – Ufuk On 8 December 2016 at 08:43:32, Lukas Kircher ([hidden email]) wrote: > Hi Stefan, > > thanks for your answer. > > > I think there is a field in FileInputFormat (which TextInputFormat is subclassing) > that could serve your purpose if you override the default: > > That was my first guess as well. I use the basic setup from org.apache.flink.api.java.io.TextInputFormatTest.java > and call setNestedFileEnumeration(true), but once the stream is processed only the > content of the .csv file in the top-most folder is printed. The example is just a few lines > of self-contained code, see below. Does anybody have an idea? > > Cheers, > Lukas > > > import org.apache.flink.api.java.io.TextInputFormat; > import org.apache.flink.core.fs.Path; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > import java.io.BufferedWriter; > import java.io.File; > import java.io.FileWriter; > > public class ReadDirectorySSCCE { > public static void main(String[] args) throws Exception { > // create given dirs and add a .csv file to each one > String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"}; > for (String dir: dirs) { > // create input file > File tmpDir = new File(dir); > if (!tmpDir.exists()) { > tmpDir.mkdirs(); > } > File tempFile = File.createTempFile("file", ".csv", tmpDir); > BufferedWriter w = new BufferedWriter(new FileWriter(tempFile)); > w.write("content of " + dir + "/file.csv"); > w.close(); > tempFile.deleteOnExit(); > } > File root = new File("tmp"); > > TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString())); > inputFormat.setNestedFileEnumeration(true); > > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > env.createInput(inputFormat).print(); > env.execute(); > } > > > > On 7 Dec 2016, at 17:44, Stefan Richter wrote: > > > > Hi, > > > > I think there is a field in FileInputFormat (which TextInputFormat is subclassing) > that could serve your purpose if you override the default: > > > > /** > > * The flag to specify whether recursive traversal of the input directory > > * structure is enabled. > > */ > > protected boolean enumerateNestedFiles = false; > > As for compression, I think this class also provides a InflaterInputStreamFactory > to read compressed data. > > > > Best, > > Stefan > > > >> Am 07.12.2016 um 12:10 schrieb Lukas Kircher >: > >> > >> Hi all, > >> > >> I am trying to read nested .csv files from a directory and want to switch from a custom > SourceFunction I implemented to the TextInputFormat. I have two questions: > >> > >> 1) Somehow only the file in the root directory is processed, nested files are skipped. > What am I missing? See the attachment for an SSCCE. I get the same result with flink 1.1.3 > no matter if I run it via the IDE or submit the job to the standalone binary. The file input > splits are all there, yet they don't seem to be processed. > >> > >> 2) What is the easiest way to read compressed .csv files (.zip)? > >> > >> Thanks for your help, cheers > >> Lukas > >> > >> > > > > |
Free forum by Nabble | Edit this page |