Parallelism questions

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Parallelism questions

Alexx
Hi everyone!

1. Is there a way to increase the parallelism (e.g. through REST) of some operators in a job without re-deploying the job? I found this answer which mentions scaling at runtime on Yarn/Mesos. Is it possible? How? Support for Kubernetes?
2. What happens when the number of parallel operator instances exceeds the number of task slots? For example: a job with a source (parallelism 3), a map (parallelism 8), a sink (parallelism 3), total of 14 operator instances and a setup with 8 task slots. Will the operators get chained? What if I disable operator chaining?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Parallelism questions

Dawid Wysakowicz-2

Hi Alexandru

As for 2, generally speaking the number of required slots depends on number of slot sharing groups. By default all operators belong to the default slot sharing group, that means a job requires as many slots as maximal parallelism in the job. More on the distributed runtime you can read here[1]

As for 1 I cc'ed Gary and Till who might better answer your question.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources

Best,

Dawid

On 14/01/2019 15:26, Alexandru Gutan wrote:
Hi everyone!

1. Is there a way to increase the parallelism (e.g. through REST) of some operators in a job without re-deploying the job? I found this answer which mentions scaling at runtime on Yarn/Mesos. Is it possible? How? Support for Kubernetes?
2. What happens when the number of parallel operator instances exceeds the number of task slots? For example: a job with a source (parallelism 3), a map (parallelism 8), a sink (parallelism 3), total of 14 operator instances and a setup with 8 task slots. Will the operators get chained? What if I disable operator chaining?

Thank you!

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallelism questions

Till Rohrmann
Hi Alexandru,

you can use the `modify` command `bin/flink modify <JOB_ID> --parallelism <PARALLELISM>` to modify the parallelism of a job. At the moment, it is implemented as first taking a savepoint, stopping the job and then redeploying the job with the changed parallelism and resuming from the savepoint.

Cheers,
Till

On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Alexandru

As for 2, generally speaking the number of required slots depends on number of slot sharing groups. By default all operators belong to the default slot sharing group, that means a job requires as many slots as maximal parallelism in the job. More on the distributed runtime you can read here[1]

As for 1 I cc'ed Gary and Till who might better answer your question.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources

Best,

Dawid

On 14/01/2019 15:26, Alexandru Gutan wrote:
Hi everyone!

1. Is there a way to increase the parallelism (e.g. through REST) of some operators in a job without re-deploying the job? I found this answer which mentions scaling at runtime on Yarn/Mesos. Is it possible? How? Support for Kubernetes?
2. What happens when the number of parallel operator instances exceeds the number of task slots? For example: a job with a source (parallelism 3), a map (parallelism 8), a sink (parallelism 3), total of 14 operator instances and a setup with 8 task slots. Will the operators get chained? What if I disable operator chaining?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Parallelism questions

Alexx
Thanks Till!

To execute the above (using Kubernetes), one would enter the running JobManager service and execute it?
The following REST API call does the same /jobs/:jobid/rescaling?

I assume it changes the base parallelism, but what it will do if I had already set the parallelism of my operators?
e.g.
.source(..)
.setParallelism(3)
.setUID(..)
.map(..)
.setParallelism(8)
.setUID(..)
.sink(..)
.setParallelism(3)
.setUID(..)

I think it would be a good idea to have /jobs/:jobid/rescaling, additionally requiring the operatorUID as a queryParameter, so that the parallelism of specific operators could be changed.

Best,
Alex.

On Tue, 15 Jan 2019 at 10:27, Till Rohrmann <[hidden email]> wrote:
Hi Alexandru,

you can use the `modify` command `bin/flink modify <JOB_ID> --parallelism <PARALLELISM>` to modify the parallelism of a job. At the moment, it is implemented as first taking a savepoint, stopping the job and then redeploying the job with the changed parallelism and resuming from the savepoint.

Cheers,
Till

On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Alexandru

As for 2, generally speaking the number of required slots depends on number of slot sharing groups. By default all operators belong to the default slot sharing group, that means a job requires as many slots as maximal parallelism in the job. More on the distributed runtime you can read here[1]

As for 1 I cc'ed Gary and Till who might better answer your question.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources

Best,

Dawid

On 14/01/2019 15:26, Alexandru Gutan wrote:
Hi everyone!

1. Is there a way to increase the parallelism (e.g. through REST) of some operators in a job without re-deploying the job? I found this answer which mentions scaling at runtime on Yarn/Mesos. Is it possible? How? Support for Kubernetes?
2. What happens when the number of parallel operator instances exceeds the number of task slots? For example: a job with a source (parallelism 3), a map (parallelism 8), a sink (parallelism 3), total of 14 operator instances and a setup with 8 task slots. Will the operators get chained? What if I disable operator chaining?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Parallelism questions

Till Rohrmann
Hi Alexandru,

at the moment `/jobs/:jobid/rescaling` will always change the parallelism for all operators. The maximum is the maximum parallelism which you have defined for an operator.

I agree that it should also be possible to rescale an individual operator. There internal functionality is already implemented (see JobMaster#rescaleOperators) but has not been exposed.

Cheers,
Till

On Tue, Jan 15, 2019 at 1:03 PM Alexandru Gutan <[hidden email]> wrote:
Thanks Till!

To execute the above (using Kubernetes), one would enter the running JobManager service and execute it?
The following REST API call does the same /jobs/:jobid/rescaling?

I assume it changes the base parallelism, but what it will do if I had already set the parallelism of my operators?
e.g.
.source(..)
.setParallelism(3)
.setUID(..)
.map(..)
.setParallelism(8)
.setUID(..)
.sink(..)
.setParallelism(3)
.setUID(..)

I think it would be a good idea to have /jobs/:jobid/rescaling, additionally requiring the operatorUID as a queryParameter, so that the parallelism of specific operators could be changed.

Best,
Alex.

On Tue, 15 Jan 2019 at 10:27, Till Rohrmann <[hidden email]> wrote:
Hi Alexandru,

you can use the `modify` command `bin/flink modify <JOB_ID> --parallelism <PARALLELISM>` to modify the parallelism of a job. At the moment, it is implemented as first taking a savepoint, stopping the job and then redeploying the job with the changed parallelism and resuming from the savepoint.

Cheers,
Till

On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Alexandru

As for 2, generally speaking the number of required slots depends on number of slot sharing groups. By default all operators belong to the default slot sharing group, that means a job requires as many slots as maximal parallelism in the job. More on the distributed runtime you can read here[1]

As for 1 I cc'ed Gary and Till who might better answer your question.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources

Best,

Dawid

On 14/01/2019 15:26, Alexandru Gutan wrote:
Hi everyone!

1. Is there a way to increase the parallelism (e.g. through REST) of some operators in a job without re-deploying the job? I found this answer which mentions scaling at runtime on Yarn/Mesos. Is it possible? How? Support for Kubernetes?
2. What happens when the number of parallel operator instances exceeds the number of task slots? For example: a job with a source (parallelism 3), a map (parallelism 8), a sink (parallelism 3), total of 14 operator instances and a setup with 8 task slots. Will the operators get chained? What if I disable operator chaining?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Parallelism questions

Alexx
Thats great news!

Are there any plans to expose it in the upcoming Flink release?

On Tue, 15 Jan 2019 at 12:59, Till Rohrmann <[hidden email]> wrote:
Hi Alexandru,

at the moment `/jobs/:jobid/rescaling` will always change the parallelism for all operators. The maximum is the maximum parallelism which you have defined for an operator.

I agree that it should also be possible to rescale an individual operator. There internal functionality is already implemented (see JobMaster#rescaleOperators) but has not been exposed.

Cheers,
Till

On Tue, Jan 15, 2019 at 1:03 PM Alexandru Gutan <[hidden email]> wrote:
Thanks Till!

To execute the above (using Kubernetes), one would enter the running JobManager service and execute it?
The following REST API call does the same /jobs/:jobid/rescaling?

I assume it changes the base parallelism, but what it will do if I had already set the parallelism of my operators?
e.g.
.source(..)
.setParallelism(3)
.setUID(..)
.map(..)
.setParallelism(8)
.setUID(..)
.sink(..)
.setParallelism(3)
.setUID(..)

I think it would be a good idea to have /jobs/:jobid/rescaling, additionally requiring the operatorUID as a queryParameter, so that the parallelism of specific operators could be changed.

Best,
Alex.

On Tue, 15 Jan 2019 at 10:27, Till Rohrmann <[hidden email]> wrote:
Hi Alexandru,

you can use the `modify` command `bin/flink modify <JOB_ID> --parallelism <PARALLELISM>` to modify the parallelism of a job. At the moment, it is implemented as first taking a savepoint, stopping the job and then redeploying the job with the changed parallelism and resuming from the savepoint.

Cheers,
Till

On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Alexandru

As for 2, generally speaking the number of required slots depends on number of slot sharing groups. By default all operators belong to the default slot sharing group, that means a job requires as many slots as maximal parallelism in the job. More on the distributed runtime you can read here[1]

As for 1 I cc'ed Gary and Till who might better answer your question.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources

Best,

Dawid

On 14/01/2019 15:26, Alexandru Gutan wrote:
Hi everyone!

1. Is there a way to increase the parallelism (e.g. through REST) of some operators in a job without re-deploying the job? I found this answer which mentions scaling at runtime on Yarn/Mesos. Is it possible? How? Support for Kubernetes?
2. What happens when the number of parallel operator instances exceeds the number of task slots? For example: a job with a source (parallelism 3), a map (parallelism 8), a sink (parallelism 3), total of 14 operator instances and a setup with 8 task slots. Will the operators get chained? What if I disable operator chaining?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Parallelism questions

Till Rohrmann
I'm not aware of someone working on this feature right now.

On Tue, Jan 15, 2019 at 3:22 PM Alexandru Gutan <[hidden email]> wrote:
Thats great news!

Are there any plans to expose it in the upcoming Flink release?

On Tue, 15 Jan 2019 at 12:59, Till Rohrmann <[hidden email]> wrote:
Hi Alexandru,

at the moment `/jobs/:jobid/rescaling` will always change the parallelism for all operators. The maximum is the maximum parallelism which you have defined for an operator.

I agree that it should also be possible to rescale an individual operator. There internal functionality is already implemented (see JobMaster#rescaleOperators) but has not been exposed.

Cheers,
Till

On Tue, Jan 15, 2019 at 1:03 PM Alexandru Gutan <[hidden email]> wrote:
Thanks Till!

To execute the above (using Kubernetes), one would enter the running JobManager service and execute it?
The following REST API call does the same /jobs/:jobid/rescaling?

I assume it changes the base parallelism, but what it will do if I had already set the parallelism of my operators?
e.g.
.source(..)
.setParallelism(3)
.setUID(..)
.map(..)
.setParallelism(8)
.setUID(..)
.sink(..)
.setParallelism(3)
.setUID(..)

I think it would be a good idea to have /jobs/:jobid/rescaling, additionally requiring the operatorUID as a queryParameter, so that the parallelism of specific operators could be changed.

Best,
Alex.

On Tue, 15 Jan 2019 at 10:27, Till Rohrmann <[hidden email]> wrote:
Hi Alexandru,

you can use the `modify` command `bin/flink modify <JOB_ID> --parallelism <PARALLELISM>` to modify the parallelism of a job. At the moment, it is implemented as first taking a savepoint, stopping the job and then redeploying the job with the changed parallelism and resuming from the savepoint.

Cheers,
Till

On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Alexandru

As for 2, generally speaking the number of required slots depends on number of slot sharing groups. By default all operators belong to the default slot sharing group, that means a job requires as many slots as maximal parallelism in the job. More on the distributed runtime you can read here[1]

As for 1 I cc'ed Gary and Till who might better answer your question.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources

Best,

Dawid

On 14/01/2019 15:26, Alexandru Gutan wrote:
Hi everyone!

1. Is there a way to increase the parallelism (e.g. through REST) of some operators in a job without re-deploying the job? I found this answer which mentions scaling at runtime on Yarn/Mesos. Is it possible? How? Support for Kubernetes?
2. What happens when the number of parallel operator instances exceeds the number of task slots? For example: a job with a source (parallelism 3), a map (parallelism 8), a sink (parallelism 3), total of 14 operator instances and a setup with 8 task slots. Will the operators get chained? What if I disable operator chaining?

Thank you!