Reading from Multiple Directories with StreamExecutionEnvironment

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading from Multiple Directories with StreamExecutionEnvironment

Jason Liu
Hi all,

    I came across this change that allows user to have multiple file paths to read from in Flink. However, I have a question about how to use this feature for StreamExecutionEnvironment.readFile(). It seems in readFile, the input filePaths actually get overwritten here. So no matter what FileInputFormat I pass into it, the filePaths will just get set to a single directory later. Just curious if I'm missing something here?

This is a sample code I have:
// Read from S3 object to get the list of S3 paths.
final List<String> directoryList =
getDirectoryList(someClient.getS3ObjectContentAsString(commonBucket, directory.getKey()));

inputFormat = new TextInputFormat(new Path(inputBucketProperty));
inputFormat.setFilePaths((String[]) directoryList.toArray());
inputFormat.setNestedFileEnumeration(true);

streamEnv
.readFile(inputFormat, "some path")
.addSink(createSink());

streamEnv.execute(getClass().getSimpleName());
This is going to run on Kinesis Data Analytics, if that makes any difference.

Thanks for the help, if any :)
-Jason
Reply | Threaded
Open this post in threaded view
|

Re: Reading from Multiple Directories with StreamExecutionEnvironment

Kostas Kloudas-2
Hi Jason,

Your analysis seems correct.

As an alternative, you could:
1) either call readFile multiple times on the
StreamExecutionEnvironment (once for each dir you want to monitor) and
then union the streams, or
2) you could put all the dirs you want to monitor under a common
parent dir and specify that as the directory to monitor and set the
setNestedFileEnumeration() on the input format to search recursively.

As a side note, this is not going to be a problem anymore with the new
FileSource, although this will come in the next Flink release.

Cheers,
Kostas

On Tue, Oct 6, 2020 at 2:11 AM Jason Liu <[hidden email]> wrote:

>
> Hi all,
>
>     I came across this change that allows user to have multiple file paths to read from in Flink. However, I have a question about how to use this feature for StreamExecutionEnvironment.readFile(). It seems in readFile, the input filePaths actually get overwritten here. So no matter what FileInputFormat I pass into it, the filePaths will just get set to a single directory later. Just curious if I'm missing something here?
>
> This is a sample code I have:
>
> // Read from S3 object to get the list of S3 paths.
> final List<String> directoryList =
>     getDirectoryList(someClient.getS3ObjectContentAsString(commonBucket, directory.getKey()));
>
> inputFormat = new TextInputFormat(new Path(inputBucketProperty));
> inputFormat.setFilePaths((String[]) directoryList.toArray());
> inputFormat.setNestedFileEnumeration(true);
>
> streamEnv
>     .readFile(inputFormat, "some path")
>     .addSink(createSink());
>
> streamEnv.execute(getClass().getSimpleName());
>
> This is going to run on Kinesis Data Analytics, if that makes any difference.
>
> Thanks for the help, if any :)
> -Jason