Hi Ken,
at the moment, there are just two parameters to control the parallelism of Flink operators generated by the Cascading-Flink connector.
The parameters are:
- flink.num.sourceTasks to specify the parallelism of source tasks.
- flink.num.shuffleTasks to specify the parallelism of all shuffling tasks (GroupBy, CoGroup).
Non-shuffling operators such as Each/Map and HashJoin take the parallelism of their predecessor (for HashJoin the first input) to avoid random shuffling.
So an Each/Map or Join that immediately follows a source runs with the source parallelism.
Effectively, most operators will run with the shuffle parallelism, because Each and HashJoin pick it up once their input was shuffled.
It is currently not possible to specify the parallelism of an individual task.
However, I am open for suggestions if you have a good idea to improve the situation.
I think we should continue the discussion on the Cascading-Flink Github project since this is feature would not require changes in Flink but only in the Cascading Flink runner.
Best, Fabian