import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; public class ReadDirectorySSCCE { public static void main(String[] args) throws Exception { // create given dirs and add a .csv file to each one String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"}; for (String dir: dirs) { // create input file File tmpDir = new File(dir); if (!tmpDir.exists()) { tmpDir.mkdirs(); } File tempFile = File.createTempFile("file", ".csv", tmpDir); BufferedWriter w = new BufferedWriter(new FileWriter(tempFile)); w.write("content of " + dir + "/file.csv"); w.close(); tempFile.deleteOnExit(); } File root = new File("tmp"); TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString())); inputFormat.setNestedFileEnumeration(true); // FileInputSplit[] splits = inputFormat.createInputSplits(1); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.createInput(inputFormat).print(); env.execute(); } }