Query regarding to CountinousFileMonitoring operator

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Query regarding to CountinousFileMonitoring operator

Puneet Kinra-2
if i set parallelsim equals to 1 still it create multiple splits while processing.

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Query regarding to CountinousFileMonitoring operator

Timo Walther
Hi Puneet,

can you share a little code example with us? I could not reproduce your problem.

You have to keep in mind that a setParallelism() only affects the last operation. If you want to change the default parallelism of the entire pipeline, you have to change it in StreamExecutionEnvironment. Otherwise every following operator will again have the full parallelism which leads to a shuffle operation after your source.

I hope this helps.

Regards,
Timo


Am 22.03.18 um 09:17 schrieb Puneet Kinra:
if i set parallelsim equals to 1 still it create multiple splits while processing.

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: Query regarding to CountinousFileMonitoring operator

Puneet Kinra-2
Hi Timo
                FileInputFormat fileInputFormat = new TextInputFormat(new Path(fileSystem+this.path));
fileInputFormat.setNestedFileEnumeration(true);
fileInputFormat.setFilesFilter(new UnicaFileFilter(".csv"));
DataStream<String>value =this.execEnv.readFile(fileInputFormat,
fileSystem+this.path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 2L).setParrallelism(1);

1) Now if i set parallelism equals to 1 the file get sequentially processed .
2) Modify splits are being processed on the same task manger sequentially.
3) I want to move the files after being processed.(How to achieve this) .




On Mon, Mar 26, 2018 at 3:27 PM, Timo Walther <[hidden email]> wrote:
Hi Puneet,

can you share a little code example with us? I could not reproduce your problem.

You have to keep in mind that a setParallelism() only affects the last operation. If you want to change the default parallelism of the entire pipeline, you have to change it in StreamExecutionEnvironment. Otherwise every following operator will again have the full parallelism which leads to a shuffle operation after your source.

I hope this helps.

Regards,
Timo


Am 22.03.18 um 09:17 schrieb Puneet Kinra:
if i set parallelsim equals to 1 still it create multiple splits while processing.

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]






--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Query regarding to CountinousFileMonitoring operator

Kostas Kloudas
Hi Puneet,

If you mean that after processing a file, you want to move it to another directory outside the one containing 
the data to be processed, then I am afraid that this is currently not possible. This is because the whole logic 
of how to treat files is included in your FileInputFormat. 

It may be possible to do it if you implement your custom FileInputFormat that creates the splits, moves the file, 
and modifies the splits to point to the new location of the file before shipping them downstream to be read 
(but I have not done it).

Keep in mind that if you do not change the contents of the file, then it will not be reprocessed.

Cheers,
Kostas

On Mar 26, 2018, at 12:18 PM, Puneet Kinra <[hidden email]> wrote:

Hi Timo
                FileInputFormat fileInputFormat = new TextInputFormat(new Path(fileSystem+this.path));
fileInputFormat.setNestedFileEnumeration(true);
fileInputFormat.setFilesFilter(new UnicaFileFilter(".csv"));
DataStream<String>value =this.execEnv.readFile(fileInputFormat,
fileSystem+this.path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 2L).setParrallelism(1);

1) Now if i set parallelism equals to 1 the file get sequentially processed .
2) Modify splits are being processed on the same task manger sequentially.
3) I want to move the files after being processed.(How to achieve this) .




On Mon, Mar 26, 2018 at 3:27 PM, Timo Walther <[hidden email]> wrote:
Hi Puneet,

can you share a little code example with us? I could not reproduce your problem.

You have to keep in mind that a setParallelism() only affects the last operation. If you want to change the default parallelism of the entire pipeline, you have to change it in StreamExecutionEnvironment. Otherwise every following operator will again have the full parallelism which leads to a shuffle operation after your source.

I hope this helps.

Regards,
Timo


Am 22.03.18 um 09:17 schrieb Puneet Kinra:
if i set parallelsim equals to 1 still it create multiple splits while processing.

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]






--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: Query regarding to CountinousFileMonitoring operator

Puneet Kinra-2
Hi Kostas

Thanks for the reply, Yep i am planning to implement the same.





On Mon, Mar 26, 2018 at 7:53 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Puneet,

If you mean that after processing a file, you want to move it to another directory outside the one containing 
the data to be processed, then I am afraid that this is currently not possible. This is because the whole logic 
of how to treat files is included in your FileInputFormat. 

It may be possible to do it if you implement your custom FileInputFormat that creates the splits, moves the file, 
and modifies the splits to point to the new location of the file before shipping them downstream to be read 
(but I have not done it).

Keep in mind that if you do not change the contents of the file, then it will not be reprocessed.

Cheers,
Kostas

On Mar 26, 2018, at 12:18 PM, Puneet Kinra <[hidden email]> wrote:

Hi Timo
                FileInputFormat fileInputFormat = new TextInputFormat(new Path(fileSystem+this.path));
fileInputFormat.setNestedFileEnumeration(true);
fileInputFormat.setFilesFilter(new UnicaFileFilter(".csv"));
DataStream<String>value =this.execEnv.readFile(fileInputFormat,
fileSystem+this.path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 2L).setParrallelism(1);

1) Now if i set parallelism equals to 1 the file get sequentially processed .
2) Modify splits are being processed on the same task manger sequentially.
3) I want to move the files after being processed.(How to achieve this) .




On Mon, Mar 26, 2018 at 3:27 PM, Timo Walther <[hidden email]> wrote:
Hi Puneet,

can you share a little code example with us? I could not reproduce your problem.

You have to keep in mind that a setParallelism() only affects the last operation. If you want to change the default parallelism of the entire pipeline, you have to change it in StreamExecutionEnvironment. Otherwise every following operator will again have the full parallelism which leads to a shuffle operation after your source.

I hope this helps.

Regards,
Timo


Am 22.03.18 um 09:17 schrieb Puneet Kinra:
if i set parallelsim equals to 1 still it create multiple splits while processing.

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]






--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]






--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]