Tuning parallelism in cascading-flink planner

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Tuning parallelism in cascading-flink planner

Ken Krugler
Hi all,

I’m busy tuning up a workflow (defined w/Cascading, planned with Flink) that runs on a 5 slave EMR cluster.

The default parallelism (from the Flink planner) is set to 40, since I’ve got 5 task managers (one per node) and 8 slots/TM.

But this seems to jam things up, as I see simultaneous GroupReduce subtasks competing for resources (or so it seems).

Any insight into how to tune this?

And what’s the right way to set it on a sub-task basis? With Cascading Flows planned for M-R I can set the number of reducers via a Hadoop JobConf configuration setting, on a per-step (to use Cascading lingo) basis. But with a Flow planned for Flink, there’s only a single “step”.

Thanks,

— Ken

Reply | Threaded
Open this post in threaded view
|

Re: Tuning parallelism in cascading-flink planner

Fabian Hueske-2
Hi Ken,

at the moment, there are just two parameters to control the parallelism of Flink operators generated by the Cascading-Flink connector.

The parameters are:
- flink.num.sourceTasks to specify the parallelism of source tasks.
- flink.num.shuffleTasks to specify the parallelism of all shuffling tasks (GroupBy, CoGroup).

Non-shuffling operators such as Each/Map and HashJoin take the parallelism of their predecessor (for HashJoin the first input) to avoid random shuffling.
So an Each/Map or Join that immediately follows a source runs with the source parallelism.
Effectively, most operators will run with the shuffle parallelism, because Each and HashJoin pick it up once their input was shuffled.

It is currently not possible to specify the parallelism of an individual task.
However, I am open for suggestions if you have a good idea to improve the situation.
I think we should continue the discussion on the Cascading-Flink Github project since this is feature would not require changes in Flink but only in the Cascading Flink runner.

Best, Fabian

2016-04-27 6:25 GMT+02:00 Ken Krugler <[hidden email]>:
Hi all,

I’m busy tuning up a workflow (defined w/Cascading, planned with Flink) that runs on a 5 slave EMR cluster.

The default parallelism (from the Flink planner) is set to 40, since I’ve got 5 task managers (one per node) and 8 slots/TM.

But this seems to jam things up, as I see simultaneous GroupReduce subtasks competing for resources (or so it seems).

Any insight into how to tune this?

And what’s the right way to set it on a sub-task basis? With Cascading Flows planned for M-R I can set the number of reducers via a Hadoop JobConf configuration setting, on a per-step (to use Cascading lingo) basis. But with a Flow planned for Flink, there’s only a single “step”.

Thanks,

— Ken