Create stream from multiple files using "env.readFile(format, input1, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter())" ?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Create stream from multiple files using "env.readFile(format, input1, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter())" ?

Anchit Jatana
Hi All,

I have a use case where in need to create multiple source streams from multiple files and monitor the files for any changes using the "FileProcessingMode.PROCESS_CONTINUOUSLY"

Intention is to achieve something like this(have a monitored stream for each of the multiple files), something like this:

DataStream<String> stream1 = env.readFile(format, input1, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter());

DataStream<String> stream2 = env.readFile(format, input2, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter());

DataStream<String> stream3env.readFile(format, input3, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter());

.

.

.

.

DataStream<String> streamN = env.readFile(format, inputN, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter());

Since, this implementation doesn't work, can someone suggest a way how this thing can be achieved? 


PS: Main intention is to 'monitor' all the files and stream the updated content if any change has been made to it.


Thank you!

Regards,

Anchit

Reply | Threaded
Open this post in threaded view
|

Re: Create stream from multiple files using "env.readFile(format, input1, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter())" ?

Aljoscha Krettek
Hi,
how does "doesn't work" manifest?

Cheers,
Aljoscha

On Wed, 28 Sep 2016 at 22:54 Anchit Jatana <[hidden email]> wrote:
Hi All,

I have a use case where in need to create multiple source streams from multiple files and monitor the files for any changes using the "FileProcessingMode.PROCESS_CONTINUOUSLY"

Intention is to achieve something like this(have a monitored stream for each of the multiple files), something like this:

DataStream<String> stream1 = env.readFile(format, input1, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter());

DataStream<String> stream2 = env.readFile(format, input2, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter());

DataStream<String> stream3env.readFile(format, input3, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter());

.

.

.

.

DataStream<String> streamN = env.readFile(format, inputN, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter());

Since, this implementation doesn't work, can someone suggest a way how this thing can be achieved? 


PS: Main intention is to 'monitor' all the files and stream the updated content if any change has been made to it.


Thank you!

Regards,

Anchit