Hey,
I have a flink job which has a default parallelism set to 2. I want to key the stream and then apply some flatMap on the keyed stream. The flatMap operation is quiet costly, so I want to have a much higher parallelism here (lets say 16). Additionally, it is important that the flatMap operation is executed for the same key always in the same process or in the same task. I have the following code: ---- env.setParallelism(2) val input: DataStream[Event] = /* from somewhere */ input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).print() ---- This works fine, and the "ExpensiveOperation" is executed always on the same tasks for the same keys. Now I tried two things: 1. ---- env.setParallelism(2) val input: DataStream[Event] = /* from somewhere */ input.keyBy(_.eventType).setParallelism(16).flatMap(new ExpensiveOperation()).print() ---- This fails with an exception because I can't set the parallelism on the keyBy operator. 2. ----- env.setParallelism(2) val input: DataStream[Event] = /* from somewhere */ input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).setParallelism(16).print() ----- While this executes, it breaks the assignment of the keys to the tasks: The "ExpensiveOperation" is now not executed on the same nodes anymore all the time (visible by the prefixes in the print()). What am I doing wrong? Is the only chance to set the whole parallelism of the whole flink job to 16? Thanks, have nice holidays, Dominik |
Domink, This should work just as you expect. Maybe the output of the print is just misleading you. The print() operation will still have a parallelism of two but the flatMap() with have a parallelism of 16 and all data elements with the same key will get routed to the same host. Any sequence of keyBy().flatMap() will always properly partition the data across the instances of the flatMap() operator by key. -Jamie On Mon, Dec 26, 2016 at 10:52 AM, Dominik Bruhn <[hidden email]> wrote: Hey, |
Free forum by Nabble | Edit this page |