Question about setNestedFileEnumeration()

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about setNestedFileEnumeration()

Billy Bain
I have a Streaming process where new directories are added daily in S3. 

s3://foo/bar/2021-01-18/data.gz
s3://foo/bar/2021-01-19/data.gz
s3://foo/bar/2021-01-20/data.gz
 
If I start the process, it won't pick up anything other than the directories visible when the process was started. 

The textInput has this applied:
textInputFormat.setNestedFileEnumeration(true);

DataStreamSource<String> lines = env.readFile(textInputFormat, inputPath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);
-- 
Wayne D. Young
aka Billy Bob Bain
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Question about setNestedFileEnumeration()

Billy Bain
I sent this a little prematurely. Will the streaming process find new directories under the parent?

If the input path is 
    s3://foo.bar/
and directories are added daily, should I expect that the newly added directories+files will get processed?

Thanks!

Wayne

On 2021/01/21 23:20:41, Billy Bain <[hidden email]> wrote:

> I have a Streaming process where new directories are added daily in S3.>
>
> s3://foo/bar/2021-01-18/data.gz>
> s3://foo/bar/2021-01-19/data.gz>
> s3://foo/bar/2021-01-20/data.gz>
>
> If I start the process, it won't pick up anything other than the>
> directories visible when the process was started.>
>
> The textInput has this applied:>
> textInputFormat.setNestedFileEnumeration(true);>
>
> DataStreamSource<String> lines = env.readFile(textInputFormat, inputPath,>
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);>
> -- >
> Wayne D. Young>
> aka Billy Bob Bain>
> [hidden email]>
>
Reply | Threaded
Open this post in threaded view
|

Re: Question about setNestedFileEnumeration()

Matthias
Hi Wayne,
based on other mailing list discussion ([1]) you can assume that the combination of FileProcessingMode.PROCESS_CONTINUOUSLY and setting FileInputFormat.setNestedFileEnumeration to true should work as you expect it to work.

Can you provide more context on your issue like log files? Which Flink version are you using? Have you tried checking whether this also applies when accessing a local directory?

Best,
Matthias


On Fri, Jan 22, 2021 at 2:32 AM Billy Bain <[hidden email]> wrote:
I sent this a little prematurely. Will the streaming process find new directories under the parent?

If the input path is 
    s3://foo.bar/
and directories are added daily, should I expect that the newly added directories+files will get processed?

Thanks!

Wayne

On 2021/01/21 23:20:41, Billy Bain <[hidden email]> wrote:

> I have a Streaming process where new directories are added daily in S3.>
>
> s3://foo/bar/2021-01-18/data.gz>
> s3://foo/bar/2021-01-19/data.gz>
> s3://foo/bar/2021-01-20/data.gz>
>
> If I start the process, it won't pick up anything other than the>
> directories visible when the process was started.>
>
> The textInput has this applied:>
> textInputFormat.setNestedFileEnumeration(true);>
>
> DataStreamSource<String> lines = env.readFile(textInputFormat, inputPath,>
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);>
> -- >
> Wayne D. Young>
> aka Billy Bob Bain>
> [hidden email]>
>
Reply | Threaded
Open this post in threaded view
|

Re: Question about setNestedFileEnumeration()

Piotr Nowojski-4
In reply to this post by Billy Bain
Hi Billy,

Could you maybe share some minimal code reproducing the problem? For example I would suggest to start with reading from local files with some trivial application. 

Best Piotrek

pt., 22 sty 2021 o 00:21 Billy Bain <[hidden email]> napisał(a):
I have a Streaming process where new directories are added daily in S3. 

s3://foo/bar/2021-01-18/data.gz
s3://foo/bar/2021-01-19/data.gz
s3://foo/bar/2021-01-20/data.gz
 
If I start the process, it won't pick up anything other than the directories visible when the process was started. 

The textInput has this applied:
textInputFormat.setNestedFileEnumeration(true);

DataStreamSource<String> lines = env.readFile(textInputFormat, inputPath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);
-- 
Wayne D. Young
aka Billy Bob Bain
[hidden email]