At end of complex parallel flow, how to force end step with parallel=1?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

At end of complex parallel flow, how to force end step with parallel=1?

Garrett Barton
I have a complex alg implemented using the DataSet api and by default it runs with parallel 90 for good performance. At the end I want to perform a clustering of the resulting data and to do that correctly I need to pass all the data through a single thread/process.

I read in the docs that as long as I did a global reduce using DataSet.reduceGroup(new GroupReduceFunction....) that it would force it to a single thread.  Yet when I run the flow and bring it up in the ui, I see parallel 90 all the way through the dag including this one.

Is there a config or feature to force the flow back to a single thread?  Or should I just split this into two completely separate jobs?  I'd rather not split as I would like to use flinks ability to iterate on this alg and cluster combo.

Thank you
Reply | Threaded
Open this post in threaded view
|

Re: At end of complex parallel flow, how to force end step with parallel=1?

Gábor Gévay
Hi Garrett,

You can call .setParallelism(1) on just this operator:

ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)

Best,
Gabor



On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton <[hidden email]> wrote:

> I have a complex alg implemented using the DataSet api and by default it
> runs with parallel 90 for good performance. At the end I want to perform a
> clustering of the resulting data and to do that correctly I need to pass all
> the data through a single thread/process.
>
> I read in the docs that as long as I did a global reduce using
> DataSet.reduceGroup(new GroupReduceFunction....) that it would force it to a
> single thread.  Yet when I run the flow and bring it up in the ui, I see
> parallel 90 all the way through the dag including this one.
>
> Is there a config or feature to force the flow back to a single thread?  Or
> should I just split this into two completely separate jobs?  I'd rather not
> split as I would like to use flinks ability to iterate on this alg and
> cluster combo.
>
> Thank you
Reply | Threaded
Open this post in threaded view
|

Re: At end of complex parallel flow, how to force end step with parallel=1?

Garrett Barton
Gábor
​,
Thank you for the reply, I gave that a go and the flow still showed parallel 90 for each step.  Is the ui not 100% accurate perhaps?

To get around it for now I implemented a partitioner that threw all the data to the same partition, hack but works!​

On Tue, Oct 3, 2017 at 4:12 AM, Gábor Gévay <[hidden email]> wrote:
Hi Garrett,

You can call .setParallelism(1) on just this operator:

ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)

Best,
Gabor



On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton <[hidden email]> wrote:
> I have a complex alg implemented using the DataSet api and by default it
> runs with parallel 90 for good performance. At the end I want to perform a
> clustering of the resulting data and to do that correctly I need to pass all
> the data through a single thread/process.
>
> I read in the docs that as long as I did a global reduce using
> DataSet.reduceGroup(new GroupReduceFunction....) that it would force it to a
> single thread.  Yet when I run the flow and bring it up in the ui, I see
> parallel 90 all the way through the dag including this one.
>
> Is there a config or feature to force the flow back to a single thread?  Or
> should I just split this into two completely separate jobs?  I'd rather not
> split as I would like to use flinks ability to iterate on this alg and
> cluster combo.
>
> Thank you

Reply | Threaded
Open this post in threaded view
|

Re: At end of complex parallel flow, how to force end step with parallel=1?

Fabian Hueske-2
Hi Garrett,

that's strange. DataSet.reduceGroup() will create a non-parallel GroupReduce operator.
So even without setting the parallelism manually to 1, the operator should not run in parallel.
What might happen though is that a combiner is applied to locally reduce the data before it is shipped to the single instance.
Does your GroupReduceFunction implement a Combiner interface?

I'm not aware of visualization problems of the web UI.
Can you maybe share a screenshot of the UI showing the issue?

Thanks, Fabian

2017-10-03 21:57 GMT+02:00 Garrett Barton <[hidden email]>:
Gábor
​,
Thank you for the reply, I gave that a go and the flow still showed parallel 90 for each step.  Is the ui not 100% accurate perhaps?

To get around it for now I implemented a partitioner that threw all the data to the same partition, hack but works!​

On Tue, Oct 3, 2017 at 4:12 AM, Gábor Gévay <[hidden email]> wrote:
Hi Garrett,

You can call .setParallelism(1) on just this operator:

ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)

Best,
Gabor



On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton <[hidden email]> wrote:
> I have a complex alg implemented using the DataSet api and by default it
> runs with parallel 90 for good performance. At the end I want to perform a
> clustering of the resulting data and to do that correctly I need to pass all
> the data through a single thread/process.
>
> I read in the docs that as long as I did a global reduce using
> DataSet.reduceGroup(new GroupReduceFunction....) that it would force it to a
> single thread.  Yet when I run the flow and bring it up in the ui, I see
> parallel 90 all the way through the dag including this one.
>
> Is there a config or feature to force the flow back to a single thread?  Or
> should I just split this into two completely separate jobs?  I'd rather not
> split as I would like to use flinks ability to iterate on this alg and
> cluster combo.
>
> Thank you


Reply | Threaded
Open this post in threaded view
|

Re: At end of complex parallel flow, how to force end step with parallel=1?

Garrett Barton
Fabian,

 Turns out I was wrong.  My flow was in fact running in two separate jobs due to me trying to use a local variable calculated by ...distinct().count() in a downstream flow.  The second flow indeed set parallelism correctly!  Thank you for the help. :)

On Wed, Oct 4, 2017 at 8:01 AM, Fabian Hueske <[hidden email]> wrote:
Hi Garrett,

that's strange. DataSet.reduceGroup() will create a non-parallel GroupReduce operator.
So even without setting the parallelism manually to 1, the operator should not run in parallel.
What might happen though is that a combiner is applied to locally reduce the data before it is shipped to the single instance.
Does your GroupReduceFunction implement a Combiner interface?

I'm not aware of visualization problems of the web UI.
Can you maybe share a screenshot of the UI showing the issue?

Thanks, Fabian

2017-10-03 21:57 GMT+02:00 Garrett Barton <[hidden email]>:
Gábor
​,
Thank you for the reply, I gave that a go and the flow still showed parallel 90 for each step.  Is the ui not 100% accurate perhaps?

To get around it for now I implemented a partitioner that threw all the data to the same partition, hack but works!​

On Tue, Oct 3, 2017 at 4:12 AM, Gábor Gévay <[hidden email]> wrote:
Hi Garrett,

You can call .setParallelism(1) on just this operator:

ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)

Best,
Gabor



On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton <[hidden email]> wrote:
> I have a complex alg implemented using the DataSet api and by default it
> runs with parallel 90 for good performance. At the end I want to perform a
> clustering of the resulting data and to do that correctly I need to pass all
> the data through a single thread/process.
>
> I read in the docs that as long as I did a global reduce using
> DataSet.reduceGroup(new GroupReduceFunction....) that it would force it to a
> single thread.  Yet when I run the flow and bring it up in the ui, I see
> parallel 90 all the way through the dag including this one.
>
> Is there a config or feature to force the flow back to a single thread?  Or
> should I just split this into two completely separate jobs?  I'd rather not
> split as I would like to use flinks ability to iterate on this alg and
> cluster combo.
>
> Thank you



Reply | Threaded
Open this post in threaded view
|

Re: At end of complex parallel flow, how to force end step with parallel=1?

Fabian Hueske-2
Hi Garrett,

thanks for reporting back!
Glad you could resolve the issue :-)

Best, Fabian

2017-10-05 23:21 GMT+02:00 Garrett Barton <[hidden email]>:
Fabian,

 Turns out I was wrong.  My flow was in fact running in two separate jobs due to me trying to use a local variable calculated by ...distinct().count() in a downstream flow.  The second flow indeed set parallelism correctly!  Thank you for the help. :)

On Wed, Oct 4, 2017 at 8:01 AM, Fabian Hueske <[hidden email]> wrote:
Hi Garrett,

that's strange. DataSet.reduceGroup() will create a non-parallel GroupReduce operator.
So even without setting the parallelism manually to 1, the operator should not run in parallel.
What might happen though is that a combiner is applied to locally reduce the data before it is shipped to the single instance.
Does your GroupReduceFunction implement a Combiner interface?

I'm not aware of visualization problems of the web UI.
Can you maybe share a screenshot of the UI showing the issue?

Thanks, Fabian

2017-10-03 21:57 GMT+02:00 Garrett Barton <[hidden email]>:
Gábor
​,
Thank you for the reply, I gave that a go and the flow still showed parallel 90 for each step.  Is the ui not 100% accurate perhaps?

To get around it for now I implemented a partitioner that threw all the data to the same partition, hack but works!​

On Tue, Oct 3, 2017 at 4:12 AM, Gábor Gévay <[hidden email]> wrote:
Hi Garrett,

You can call .setParallelism(1) on just this operator:

ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)

Best,
Gabor



On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton <[hidden email]> wrote:
> I have a complex alg implemented using the DataSet api and by default it
> runs with parallel 90 for good performance. At the end I want to perform a
> clustering of the resulting data and to do that correctly I need to pass all
> the data through a single thread/process.
>
> I read in the docs that as long as I did a global reduce using
> DataSet.reduceGroup(new GroupReduceFunction....) that it would force it to a
> single thread.  Yet when I run the flow and bring it up in the ui, I see
> parallel 90 all the way through the dag including this one.
>
> Is there a config or feature to force the flow back to a single thread?  Or
> should I just split this into two completely separate jobs?  I'd rather not
> split as I would like to use flinks ability to iterate on this alg and
> cluster combo.
>
> Thank you