Hi all, I came across this change that allows user to have multiple file paths to read from in Flink. However, I have a question about how to use this feature for StreamExecutionEnvironment.readFile(). It seems in readFile, the input filePaths actually get overwritten here. So no matter what FileInputFormat I pass into it, the filePaths will just get set to a single directory later. Just curious if I'm missing something here? This is a sample code I have: // Read from S3 object to get the list of S3 paths. This is going to run on Kinesis Data Analytics, if that makes any difference. Thanks for the help, if any :) -Jason |
Hi Jason,
Your analysis seems correct. As an alternative, you could: 1) either call readFile multiple times on the StreamExecutionEnvironment (once for each dir you want to monitor) and then union the streams, or 2) you could put all the dirs you want to monitor under a common parent dir and specify that as the directory to monitor and set the setNestedFileEnumeration() on the input format to search recursively. As a side note, this is not going to be a problem anymore with the new FileSource, although this will come in the next Flink release. Cheers, Kostas On Tue, Oct 6, 2020 at 2:11 AM Jason Liu <[hidden email]> wrote: > > Hi all, > > I came across this change that allows user to have multiple file paths to read from in Flink. However, I have a question about how to use this feature for StreamExecutionEnvironment.readFile(). It seems in readFile, the input filePaths actually get overwritten here. So no matter what FileInputFormat I pass into it, the filePaths will just get set to a single directory later. Just curious if I'm missing something here? > > This is a sample code I have: > > // Read from S3 object to get the list of S3 paths. > final List<String> directoryList = > getDirectoryList(someClient.getS3ObjectContentAsString(commonBucket, directory.getKey())); > > inputFormat = new TextInputFormat(new Path(inputBucketProperty)); > inputFormat.setFilePaths((String[]) directoryList.toArray()); > inputFormat.setNestedFileEnumeration(true); > > streamEnv > .readFile(inputFormat, "some path") > .addSink(createSink()); > > streamEnv.execute(getClass().getSimpleName()); > > This is going to run on Kinesis Data Analytics, if that makes any difference. > > Thanks for the help, if any :) > -Jason |
Free forum by Nabble | Edit this page |