Distribute Parallelism/Tasks within RichOutputFormat?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Distribute Parallelism/Tasks within RichOutputFormat?

Hailu, Andreas

Hi folks,

 

I’ve got a single RichOutputFormat which is comprised of two HadoopOutputFormats, let’s call them A and B, each writing to different HDFS directories. If a Record matches a certain condition it’s written using A, otherwise it’s written with B. Currently, the parallelism that is set at the RichOutputFormat seems to propagates to both A & B – meaning if the parallelism set on the RichOutputFormat is 10, output A and B create 10 files even if A receives all the records and B receives none.

 

My app has knowledge about the ratio of records it expects will be sent to output A vs output B, and I would ideally like that pass that down through the RichOutputFormat. Meaning that if we have a parallelism of 10, and know that 70% of the Records being sent go to A, I would like to supply the A with 7 parallelism and B with 3.

 

I’m curious because the current approach can lead to lots of redundant empty files, and I’d like to minimize that if possible. Is something like this supported?

 

____________

 

Andreas Hailu

Data Lake Engineering | Goldman Sachs & Co.

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: Distribute Parallelism/Tasks within RichOutputFormat?

Flavio Pompermaier
I'm not an expert of the streaming APIs but you could try to do something like this:

DataStream<Row> ds = null;
DataStream<Row> ds1 = ds.filter(...).setParallelism(3);
DataStream<Row> ds2 = ds.filter(...).setParallelism(7);

Could it fit your needs?

Best,
Flavio

On Wed, Dec 23, 2020 at 3:54 AM Hailu, Andreas [Engineering] <[hidden email]> wrote:

Hi folks,

 

I’ve got a single RichOutputFormat which is comprised of two HadoopOutputFormats, let’s call them A and B, each writing to different HDFS directories. If a Record matches a certain condition it’s written using A, otherwise it’s written with B. Currently, the parallelism that is set at the RichOutputFormat seems to propagates to both A & B – meaning if the parallelism set on the RichOutputFormat is 10, output A and B create 10 files even if A receives all the records and B receives none.

 

My app has knowledge about the ratio of records it expects will be sent to output A vs output B, and I would ideally like that pass that down through the RichOutputFormat. Meaning that if we have a parallelism of 10, and know that 70% of the Records being sent go to A, I would like to supply the A with 7 parallelism and B with 3.

 

I’m curious because the current approach can lead to lots of redundant empty files, and I’d like to minimize that if possible. Is something like this supported?

 

____________

 

Andreas Hailu

Data Lake Engineering | Goldman Sachs & Co.

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: Distribute Parallelism/Tasks within RichOutputFormat?

Chesnay Schepler
Essentially I see 2 options here:
a) split your output format such that each format is it's own sink, and then follow Flavio's suggestion to filter the stream and apply each sink to one of the streams, with the respective parallelism. This would be the recommended approach.
b) modify your (custom?) output format to only create one of the Hadoop output formats within open() based on the subtask index, and apply a custom partitioner onto the input datastream that routes the elements based on the conditions to the respective subtasks. I would not recommend this though, because it could be quite a headache maintenance-wise.

On 12/23/2020 9:53 AM, Flavio Pompermaier wrote:
I'm not an expert of the streaming APIs but you could try to do something like this:

DataStream<Row> ds = null;
DataStream<Row> ds1 = ds.filter(...).setParallelism(3);
DataStream<Row> ds2 = ds.filter(...).setParallelism(7);

Could it fit your needs?

Best,
Flavio

On Wed, Dec 23, 2020 at 3:54 AM Hailu, Andreas [Engineering] <[hidden email]> wrote:

Hi folks,

 

I’ve got a single RichOutputFormat which is comprised of two HadoopOutputFormats, let’s call them A and B, each writing to different HDFS directories. If a Record matches a certain condition it’s written using A, otherwise it’s written with B. Currently, the parallelism that is set at the RichOutputFormat seems to propagates to both A & B – meaning if the parallelism set on the RichOutputFormat is 10, output A and B create 10 files even if A receives all the records and B receives none.

 

My app has knowledge about the ratio of records it expects will be sent to output A vs output B, and I would ideally like that pass that down through the RichOutputFormat. Meaning that if we have a parallelism of 10, and know that 70% of the Records being sent go to A, I would like to supply the A with 7 parallelism and B with 3.

 

I’m curious because the current approach can lead to lots of redundant empty files, and I’d like to minimize that if possible. Is something like this supported?

 

____________

 

Andreas Hailu

Data Lake Engineering | Goldman Sachs & Co.

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices


Reply | Threaded
Open this post in threaded view
|

RE: Distribute Parallelism/Tasks within RichOutputFormat?

Hailu, Andreas

Thanks Chesnay, Flavio – I believe Flavio’s first recommendation will work well enough. I agree that the second approach may be a bit finicky to use long-term.

 

Cheers.

 

// ah

 

From: Chesnay Schepler <[hidden email]>
Sent: Wednesday, December 23, 2020 4:07 AM
To: Flavio Pompermaier <[hidden email]>; Hailu, Andreas [Engineering] <[hidden email]>
Cc: [hidden email]
Subject: Re: Distribute Parallelism/Tasks within RichOutputFormat?

 

Essentially I see 2 options here:

a) split your output format such that each format is it's own sink, and then follow Flavio's suggestion to filter the stream and apply each sink to one of the streams, with the respective parallelism. This would be the recommended approach.

b) modify your (custom?) output format to only create one of the Hadoop output formats within open() based on the subtask index, and apply a custom partitioner onto the input datastream that routes the elements based on the conditions to the respective subtasks. I would not recommend this though, because it could be quite a headache maintenance-wise.

 

On 12/23/2020 9:53 AM, Flavio Pompermaier wrote:

I'm not an expert of the streaming APIs but you could try to do something like this:

 

DataStream<Row> ds = null;
DataStream<Row> ds1 = ds.filter(...).setParallelism(3);
DataStream<Row> ds2 = ds.filter(...).setParallelism(7);

 

Could it fit your needs?

 

Best,

Flavio

 

On Wed, Dec 23, 2020 at 3:54 AM Hailu, Andreas [Engineering] <[hidden email]> wrote:

Hi folks,

 

I’ve got a single RichOutputFormat which is comprised of two HadoopOutputFormats, let’s call them A and B, each writing to different HDFS directories. If a Record matches a certain condition it’s written using A, otherwise it’s written with B. Currently, the parallelism that is set at the RichOutputFormat seems to propagates to both A & B – meaning if the parallelism set on the RichOutputFormat is 10, output A and B create 10 files even if A receives all the records and B receives none.

 

My app has knowledge about the ratio of records it expects will be sent to output A vs output B, and I would ideally like that pass that down through the RichOutputFormat. Meaning that if we have a parallelism of 10, and know that 70% of the Records being sent go to A, I would like to supply the A with 7 parallelism and B with 3.

 

I’m curious because the current approach can lead to lots of redundant empty files, and I’d like to minimize that if possible. Is something like this supported?

 

____________

 

Andreas Hailu

Data Lake Engineering | Goldman Sachs & Co.

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices