Regression for dataStream.rescale method from 1.2.1 to 1.3.2

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Regression for dataStream.rescale method from 1.2.1 to 1.3.2

Antoine Philippot
Hi,

After migrating our project from flink 1.2.1 to flink 1.3.2, we noticed a big performance drop due to a bad vertices balancing between task manager.

In our use case, we set the default parallelism to the number of task managers :
  val stream: DataStream[Array[Byte]] = env.addSource(new FlinkKafkaConsumer09[Array[Byte]]( ... )
                  .name("kafkaConsumer").rescale // 1 operator / instance

  val parallelism = nbTaskManagers * nbTaskSlots
  val hydratedStream: DataStream[Message] = stream
    .flatMap(avroDeserializer).name("AvroDeserializer").setParallelism(parallelism)
    .flatMap(messageParser).name("MessageParser").setParallelism(parallelism)
    .flatMap(messageHydration).name("Hydration").setParallelism(parallelism)
    .filter(MessageFilter).name("MessageFilter").setParallelism(parallelism)

  hydratedStream.rescale // 1 operator / instance
    .addSink(kafkaSink).name("KafkaSink")

If we take an example of 2 task managers with 4 slots by task manager
with flink 1.2.1 we had for each instances :
- 1 kafkaConsumer -> 4 mapOperators -> 1 kafkaSink

But with exactly the same code with flink 1.3.2 the sinks are all located to one instance : 
first instance : 
- 1 kafkaConsumer -> 4 mapOperators -> 2 kafkaSink
second instance : 
- 1 kafkaConsumer -> 4 mapOperators -> no kafkaSink (network transfert to the first task manager)

This behaviour is the same with more task managers either in a local cluster or in a yarn cluster

Is it a bug or should I update my code to have the same behaviour as flink 1.2.1 ?
Reply | Threaded
Open this post in threaded view
|

Re: Regression for dataStream.rescale method from 1.2.1 to 1.3.2

Till Rohrmann
Hi Antoine,

this looks like a regression to me. I'll investigate how this could happen and let you know once I find something.

Cheers,
Till

On Fri, Oct 13, 2017 at 10:16 AM, Antoine Philippot <[hidden email]> wrote:
Hi,

After migrating our project from flink 1.2.1 to flink 1.3.2, we noticed a big performance drop due to a bad vertices balancing between task manager.

In our use case, we set the default parallelism to the number of task managers :
  val stream: DataStream[Array[Byte]] = env.addSource(new FlinkKafkaConsumer09[Array[Byte]]( ... )
                  .name("kafkaConsumer").rescale // 1 operator / instance

  val parallelism = nbTaskManagers * nbTaskSlots
  val hydratedStream: DataStream[Message] = stream
    .flatMap(avroDeserializer).name("AvroDeserializer").setParallelism(parallelism)
    .flatMap(messageParser).name("MessageParser").setParallelism(parallelism)
    .flatMap(messageHydration).name("Hydration").setParallelism(parallelism)
    .filter(MessageFilter).name("MessageFilter").setParallelism(parallelism)

  hydratedStream.rescale // 1 operator / instance
    .addSink(kafkaSink).name("KafkaSink")

If we take an example of 2 task managers with 4 slots by task manager
with flink 1.2.1 we had for each instances :
- 1 kafkaConsumer -> 4 mapOperators -> 1 kafkaSink

But with exactly the same code with flink 1.3.2 the sinks are all located to one instance : 
first instance : 
- 1 kafkaConsumer -> 4 mapOperators -> 2 kafkaSink
second instance : 
- 1 kafkaConsumer -> 4 mapOperators -> no kafkaSink (network transfert to the first task manager)

This behaviour is the same with more task managers either in a local cluster or in a yarn cluster

Is it a bug or should I update my code to have the same behaviour as flink 1.2.1 ?