Rebalance to subtasks in same TaskManager instance

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Rebalance to subtasks in same TaskManager instance

johannes.barnard@clarivate.com
Hi,

I have a streaming topology with source parallelism of M and a target
operator parallelism of N.
For optimum performance I have found that I need to choose M and N
independently.
Also, the source subtasks do not all produce the same number of records and
therefor I have to rebalance to the target operator to get optimum
throughput.

The record sizes vary a lot (up to 10MB) but are about 200kB on average.

Through experimentation using the rescale() operator I have found that
maximum throughput can be significantly increased if I restrict this
rebalancing to target subtasks within the same TaskManager instances.

However I cannot use rescale for this purpose as it does not do a
rebalancing to all target subtasks in the instance.

I was hoping to use a custom Partitioner to achieve this but it is not clear
to me which partition would correspond to which subTask.

Is there any way currently to achieve this with Flink?

If it helps I believe the feature I am hoping to achieve is similar to
Storm's "Local or shuffle grouping".

Any help or suggestions will be appreciated.
Hans






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Rebalance to subtasks in same TaskManager instance

Piotr Nowojski
Hi,

Unfortunately I don’t think it’s currently possible in the Flink. Please feel free to submit a feature request for it on our JIRA https://issues.apache.org/jira/projects/FLINK/summary

Have you tried out the setup using rebalance? In most cases overhead of rebalance over rescale is not that high as one might think.

Piotrek

On 5 Feb 2018, at 15:16, [hidden email] wrote:

Hi,

I have a streaming topology with source parallelism of M and a target
operator parallelism of N.
For optimum performance I have found that I need to choose M and N
independently.
Also, the source subtasks do not all produce the same number of records and
therefor I have to rebalance to the target operator to get optimum
throughput.

The record sizes vary a lot (up to 10MB) but are about 200kB on average.

Through experimentation using the rescale() operator I have found that
maximum throughput can be significantly increased if I restrict this
rebalancing to target subtasks within the same TaskManager instances.

However I cannot use rescale for this purpose as it does not do a
rebalancing to all target subtasks in the instance.

I was hoping to use a custom Partitioner to achieve this but it is not clear
to me which partition would correspond to which subTask.

Is there any way currently to achieve this with Flink?

If it helps I believe the feature I am hoping to achieve is similar to
Storm's "Local or shuffle grouping".

Any help or suggestions will be appreciated.
Hans






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Rebalance to subtasks in same TaskManager instance

johannes.barnard@clarivate.com
Hi Piotrek

Yes I've compared rebalance with rescale. I adjusted the parallelism of the
source and target operators so that rescale would behave more or less like
the "local or shuffle grouping" option. I was able to show that for my use
case a "local or shuffle grouping" option would yield at least about 30%
performance improvement over rebalance.

I will submit a feature request.

Thanks for your prompt reply.

Hans



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/