Set Parallelism and keyBy

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Set Parallelism and keyBy

Dominik Bruhn
Hey,
I have a flink job which has a default parallelism set to 2. I want to
key the stream and then apply some flatMap on the keyed stream. The
flatMap operation is quiet costly, so I want to have a much higher
parallelism here (lets say 16). Additionally, it is important that the
flatMap operation is executed for the same key always in the same
process or in the same task.

I have the following code:

----
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).print()
----

This works fine, and the "ExpensiveOperation" is executed always on the
same tasks for the same keys.

Now I tried two things:

1.
----
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).setParallelism(16).flatMap(new
ExpensiveOperation()).print()
----
This fails with an exception because I can't set the parallelism on the
keyBy operator.

2.
-----
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).flatMap(new
ExpensiveOperation()).setParallelism(16).print()
-----
While this executes, it breaks the assignment of the keys to the tasks:
The "ExpensiveOperation" is now not executed on the same nodes anymore
all the time (visible by the prefixes in the print()).

What am I doing wrong? Is the only chance to set the whole parallelism
of the whole flink job to 16?

Thanks, have nice holidays,
Dominik
Reply | Threaded
Open this post in threaded view
|

Re: Set Parallelism and keyBy

Jamie Grier
Domink,

This should work just as you expect.  Maybe the output of the print is just misleading you.  The print() operation will still have a parallelism of two but the flatMap() with have a parallelism of 16 and all data elements with the same key will get routed to the same host.

Any sequence of keyBy().flatMap() will always properly partition the data across the instances of the flatMap() operator by key.

-Jamie


On Mon, Dec 26, 2016 at 10:52 AM, Dominik Bruhn <[hidden email]> wrote:
Hey,
I have a flink job which has a default parallelism set to 2. I want to key the stream and then apply some flatMap on the keyed stream. The flatMap operation is quiet costly, so I want to have a much higher parallelism here (lets say 16). Additionally, it is important that the flatMap operation is executed for the same key always in the same process or in the same task.

I have the following code:

----
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).print()
----

This works fine, and the "ExpensiveOperation" is executed always on the same tasks for the same keys.

Now I tried two things:

1.
----
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).setParallelism(16).flatMap(new ExpensiveOperation()).print()
----
This fails with an exception because I can't set the parallelism on the keyBy operator.

2.
-----
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).setParallelism(16).print()
-----
While this executes, it breaks the assignment of the keys to the tasks: The "ExpensiveOperation" is now not executed on the same nodes anymore all the time (visible by the prefixes in the print()).

What am I doing wrong? Is the only chance to set the whole parallelism of the whole flink job to 16?

Thanks, have nice holidays,
Dominik



--

Jamie Grier
data Artisans, Director of Applications Engineering