Possilby very slow keyBy in program with no parallelism

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Possilby very slow keyBy in program with no parallelism

Theo

Hi,

 

I wrote a small Flink program on a yarn cluster (128GB RAM, 8 core Xeon CPU for each node) essentially reading messages from kafka and applying a simple CEP rule on those messages. The program is expected to have a parallelism of 1 for input as my test kafka topic has only 1 partition.

 

The program looks pretty much like this:

 

FlinkKafkaConsumer<Databean> flinkKafkaConsumer = new FlinkKafkaConsumer<>(sourceTopicName, deserializer, properties);
flinkKafkaConsumer.setStartFromGroupOffsets();
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(
true);
flinkKafkaConsumer.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Databean>(Time.minutes(30)) {
   
@Override
   
public long extractTimestamp(Databean element) {
       
return element.getTs();
    }
});

SingleOutputStreamOperator<Databean> kafkaSource = env.addSource(flinkKafkaConsumer)

 

// Keying here is very slow???

KeyedStream<Databean, String> keyedStream = proxylogsStream.keyBy(bean -> bean.getUser());
 
Pattern<Databean, Databean> cepPattern = Pattern.<Databean>
        begin(
"firstStep", AfterMatchSkipStrategy.skipPastLastEvent()).where(new FirstFilter())
        .followedBy(
"secondStep").where(new SecondFilter())
        .within(Time.minutes(
15));

PatternStream<
Databean> patternMatchStream = CEP.pattern(keyedStream, cepPattern);

SingleOutputStreamOperator<Alarmbean> beanAlerts = patternMatchStream.select(
new MatchToAlarmConverter());

beanAlerts.addSink(
new FlinkKafkaProducer<>(config.kafkaAlertsTopic, new AlarmBeanSerializeSchema(), properties));

 

The applied CEP filter “FirstFilter” and “SecondFilter” are very simple rules like

return “humidity”.equals(bean.getRecordedMetricType())

 

My Databean has round about 50 elements containing numbers and small strings (Up to 50 characters).

Currently, I write 200.000 Elements into my Kafka topic every 5 minutes. 100.000 of those have the same username, i.e. all have the name “empty”, and the other half are almost unique. (some random number between 1 and 100000000). The generated data timestamp randomly varies +-7.5 minutes between the generated timestamp (Generation time = time pushed into kafka).

My CEP rule is written with conditions that never match, so the kafka sink as well as the stream select function can be eliminated as causes for the slow processing speeds.

 

I start the application via yarn with:

"${FLINK_HOME}/bin/yarn-session.sh" -n 4 -jm 4096m -tm 65536m --name "TESTJOB" -d
${FLINK_HOME}/bin/flink run -m ${FLINK_HOST} -d -n "${JOB_JAR}" $*
 

So the job has plenty of RAM available, but I didn’t note any difference in terms of speed when assigning 16G or 64G of RAM.  As expected, I have a single task manager and parallelism 1.

 

Now about my problem:

Currently, the pipeline processes round about 150-300 elements per second. On startup, it peaks to 3000-4000 elements per second but slows down within one minute to 150-300 elements per second.

I immediately expected CEP to be that slow (As this is my first CEP experiment), but I observed the following:

  1. Even though CEP has quite some overhead (elements must be sorted on time), my rule is very simple and should, in my perspective, perform much better on that machine. My shot in the dark before was something like 10.000 – 100.000 elements/s.
  2. None of my machine resources is fully utilized, i.e. none of the cluster CPU runs at 100% utilization (according to htop). And the memory is virtually available, but the RES column in htop states the processes uses 5499MB.
  3. According to Flink GUI, the job is split into two tasks: First there is the source task, then there is a hash arrow to the second task (the keyBy?!) and the second is cep-pattern apply,convert and write to sink.  From the UI, I know that in task 1 there is a HIGH backpressure whereas task 2 is OK in terms of backpressure measurement. Even more interesting: The metrics let me know that in the first task “0.buffers.outputQueueLength” is constantly on value 9 and “0.buffers.outPoolUsage” is constant on value 1. This goes along with the back pressure concept (The task is stuck writing to the buffer for the next stage as the next stage is consuming too slowly), however, the second task’s metric tell me that “0.buffers.inputQueueLength” is constant on value 0 and “0.buffers.inPoolUsage” is constant on value 0 as well. “0.numRecodsInPerSecond” is about 150-300 elements/s.

 

This lead me to suspect that I don’t have a CEP problem but really have a problem with “keyBy” on localhost as the second task seem to immediately consume any messages in input queue it receives. My questions:

  1. Is my observation correct that I indeed don’t have a CEP problem but the keyBy causes the issue here?
  2. Why is the queue limited to size 9? Seems really small compared to the memory available… I would have expected something like 10000 at least.
  3. What happens internally here? In distributed mode, I understand that I have a distributed queue where sender queue size and receiver queue size can be different. But on the same machine, in the same JVM, I would have expected something like a BlockingQueue where both task metrics report the same queue size (i.e. task1.outputqueue == task2.inputqueue). I expect something happening there in between “task1 writes” and “task2 receives” which slows down the entire pipeline, but I have no idea on what that could be.
  4.  Can I do something in order to boost performance by magnitudes here?

 

Best regards

Theo Diefenthal

Reply | Threaded
Open this post in threaded view
|

Re: Possilby very slow keyBy in program with no parallelism

Dawid Wysakowicz-2

Hi Theo,

Could you try replacing the CEP operator with a simple flatMap to see if the CEP is the reason for the backpressure? Another reason for this behavior might be the time of serialization (what is the serialization format?) of the records. You could also try enabling object reuse[1]. Good idea would be also to try run the flink job under a profiler to see the hot spots.

One more thing I noticed, that you have set the bounded out of orderness this makes the CEP operator to accumulate events for 30 minutes (for the sorting) before it is forwarded to the underlying state machine for processing. Could you try decreasing this value?

I can't help with explanation how the network buffers work, but I cc Piotr who should be more suited to provide meaningful information.

Regards,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/execution_configuration.html#execution-configuration

On 19/05/2019 14:16, Theo Diefenthal wrote:

Hi,

 

I wrote a small Flink program on a yarn cluster (128GB RAM, 8 core Xeon CPU for each node) essentially reading messages from kafka and applying a simple CEP rule on those messages. The program is expected to have a parallelism of 1 for input as my test kafka topic has only 1 partition.

 

The program looks pretty much like this:

 

FlinkKafkaConsumer<Databean> flinkKafkaConsumer = new FlinkKafkaConsumer<>(sourceTopicName, deserializer, properties);
flinkKafkaConsumer.setStartFromGroupOffsets();
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(
true);
flinkKafkaConsumer.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Databean>(Time.minutes(30)) {
   
@Override
   
public long extractTimestamp(Databean element) {
       
return element.getTs();
    }
});

SingleOutputStreamOperator<Databean> kafkaSource = env.addSource(flinkKafkaConsumer)

 

// Keying here is very slow???

KeyedStream<Databean, String> keyedStream = proxylogsStream.keyBy(bean -> bean.getUser());
 
Pattern<Databean, Databean> cepPattern = Pattern.<Databean>
        begin("firstStep", AfterMatchSkipStrategy.skipPastLastEvent()).where(new FirstFilter())
        .followedBy("secondStep").where(new SecondFilter())
        .within(Time.minutes(15));

PatternStream<Databean> patternMatchStream = CEP.pattern(keyedStream, cepPattern);

SingleOutputStreamOperator<Alarmbean> beanAlerts = patternMatchStream.select(new MatchToAlarmConverter());

beanAlerts.addSink(new FlinkKafkaProducer<>(config.kafkaAlertsTopic, new AlarmBeanSerializeSchema(), properties));

 

The applied CEP filter “FirstFilter” and “SecondFilter” are very simple rules like

return “humidity”.equals(bean.getRecordedMetricType())

 

My Databean has round about 50 elements containing numbers and small strings (Up to 50 characters).

Currently, I write 200.000 Elements into my Kafka topic every 5 minutes. 100.000 of those have the same username, i.e. all have the name “empty”, and the other half are almost unique. (some random number between 1 and 100000000). The generated data timestamp randomly varies +-7.5 minutes between the generated timestamp (Generation time = time pushed into kafka).

My CEP rule is written with conditions that never match, so the kafka sink as well as the stream select function can be eliminated as causes for the slow processing speeds.

 

I start the application via yarn with:

"${FLINK_HOME}/bin/yarn-session.sh" -n 4 -jm 4096m -tm 65536m --name "TESTJOB" -d
${FLINK_HOME}/bin/flink run -m ${FLINK_HOST} -d -n "${JOB_JAR}" $*
 

So the job has plenty of RAM available, but I didn’t note any difference in terms of speed when assigning 16G or 64G of RAM.  As expected, I have a single task manager and parallelism 1.

 

Now about my problem:

Currently, the pipeline processes round about 150-300 elements per second. On startup, it peaks to 3000-4000 elements per second but slows down within one minute to 150-300 elements per second.

I immediately expected CEP to be that slow (As this is my first CEP experiment), but I observed the following:

  1. Even though CEP has quite some overhead (elements must be sorted on time), my rule is very simple and should, in my perspective, perform much better on that machine. My shot in the dark before was something like 10.000 – 100.000 elements/s.
  2. None of my machine resources is fully utilized, i.e. none of the cluster CPU runs at 100% utilization (according to htop). And the memory is virtually available, but the RES column in htop states the processes uses 5499MB.
  3. According to Flink GUI, the job is split into two tasks: First there is the source task, then there is a hash arrow to the second task (the keyBy?!) and the second is cep-pattern apply,convert and write to sink.  From the UI, I know that in task 1 there is a HIGH backpressure whereas task 2 is OK in terms of backpressure measurement. Even more interesting: The metrics let me know that in the first task “0.buffers.outputQueueLength” is constantly on value 9 and “0.buffers.outPoolUsage” is constant on value 1. This goes along with the back pressure concept (The task is stuck writing to the buffer for the next stage as the next stage is consuming too slowly), however, the second task’s metric tell me that “0.buffers.inputQueueLength” is constant on value 0 and “0.buffers.inPoolUsage” is constant on value 0 as well. “0.numRecodsInPerSecond” is about 150-300 elements/s.

 

This lead me to suspect that I don’t have a CEP problem but really have a problem with “keyBy” on localhost as the second task seem to immediately consume any messages in input queue it receives. My questions:

  1. Is my observation correct that I indeed don’t have a CEP problem but the keyBy causes the issue here?
  2. Why is the queue limited to size 9? Seems really small compared to the memory available… I would have expected something like 10000 at least.
  3. What happens internally here? In distributed mode, I understand that I have a distributed queue where sender queue size and receiver queue size can be different. But on the same machine, in the same JVM, I would have expected something like a BlockingQueue where both task metrics report the same queue size (i.e. task1.outputqueue == task2.inputqueue). I expect something happening there in between “task1 writes” and “task2 receives” which slows down the entire pipeline, but I have no idea on what that could be.
  4.  Can I do something in order to boost performance by magnitudes here?

 

Best regards

Theo Diefenthal


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Possilby very slow keyBy in program with no parallelism

Dawid Wysakowicz-2
In reply to this post by Theo

Hi Theo,

Could you try replacing the CEP operator with a simple flatMap to see if the CEP is the reason for the backpressure? Another reason for this behavior might be the time of serialization (what is the serialization format?) of the records. You could also try enabling object reuse[1]. Good idea would be also to try run the flink job under a profiler to see the hot spots.

One more thing I noticed, that you have set the bounded out of orderness this makes the CEP operator to accumulate events for 30 minutes (for the sorting) before it is forwarded to the underlying state machine for processing. Could you try decreasing this value?

I can't help with explanation how the network buffers work, but I cc Piotr who should be more suited to provide meaningful information.

Regards,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/execution_configuration.html#execution-configuration

On 19/05/2019 14:16, Theo Diefenthal wrote:

Hi,

 

I wrote a small Flink program on a yarn cluster (128GB RAM, 8 core Xeon CPU for each node) essentially reading messages from kafka and applying a simple CEP rule on those messages. The program is expected to have a parallelism of 1 for input as my test kafka topic has only 1 partition.

 

The program looks pretty much like this:

 

FlinkKafkaConsumer<Databean> flinkKafkaConsumer = new FlinkKafkaConsumer<>(sourceTopicName, deserializer, properties);
flinkKafkaConsumer.setStartFromGroupOffsets();
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(
true);
flinkKafkaConsumer.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Databean>(Time.minutes(30)) {
   
@Override
   
public long extractTimestamp(Databean element) {
       
return element.getTs();
    }
});

SingleOutputStreamOperator<Databean> kafkaSource = env.addSource(flinkKafkaConsumer)

 

// Keying here is very slow???

KeyedStream<Databean, String> keyedStream = proxylogsStream.keyBy(bean -> bean.getUser());
 
Pattern<Databean, Databean> cepPattern = Pattern.<Databean>
        begin("firstStep", AfterMatchSkipStrategy.skipPastLastEvent()).where(new FirstFilter())
        .followedBy("secondStep").where(new SecondFilter())
        .within(Time.minutes(15));

PatternStream<Databean> patternMatchStream = CEP.pattern(keyedStream, cepPattern);

SingleOutputStreamOperator<Alarmbean> beanAlerts = patternMatchStream.select(new MatchToAlarmConverter());

beanAlerts.addSink(new FlinkKafkaProducer<>(config.kafkaAlertsTopic, new AlarmBeanSerializeSchema(), properties));

 

The applied CEP filter “FirstFilter” and “SecondFilter” are very simple rules like

return “humidity”.equals(bean.getRecordedMetricType())

 

My Databean has round about 50 elements containing numbers and small strings (Up to 50 characters).

Currently, I write 200.000 Elements into my Kafka topic every 5 minutes. 100.000 of those have the same username, i.e. all have the name “empty”, and the other half are almost unique. (some random number between 1 and 100000000). The generated data timestamp randomly varies +-7.5 minutes between the generated timestamp (Generation time = time pushed into kafka).

My CEP rule is written with conditions that never match, so the kafka sink as well as the stream select function can be eliminated as causes for the slow processing speeds.

 

I start the application via yarn with:

"${FLINK_HOME}/bin/yarn-session.sh" -n 4 -jm 4096m -tm 65536m --name "TESTJOB" -d
${FLINK_HOME}/bin/flink run -m ${FLINK_HOST} -d -n "${JOB_JAR}" $*
 

So the job has plenty of RAM available, but I didn’t note any difference in terms of speed when assigning 16G or 64G of RAM.  As expected, I have a single task manager and parallelism 1.

 

Now about my problem:

Currently, the pipeline processes round about 150-300 elements per second. On startup, it peaks to 3000-4000 elements per second but slows down within one minute to 150-300 elements per second.

I immediately expected CEP to be that slow (As this is my first CEP experiment), but I observed the following:

  1. Even though CEP has quite some overhead (elements must be sorted on time), my rule is very simple and should, in my perspective, perform much better on that machine. My shot in the dark before was something like 10.000 – 100.000 elements/s.
  2. None of my machine resources is fully utilized, i.e. none of the cluster CPU runs at 100% utilization (according to htop). And the memory is virtually available, but the RES column in htop states the processes uses 5499MB.
  3. According to Flink GUI, the job is split into two tasks: First there is the source task, then there is a hash arrow to the second task (the keyBy?!) and the second is cep-pattern apply,convert and write to sink.  From the UI, I know that in task 1 there is a HIGH backpressure whereas task 2 is OK in terms of backpressure measurement. Even more interesting: The metrics let me know that in the first task “0.buffers.outputQueueLength” is constantly on value 9 and “0.buffers.outPoolUsage” is constant on value 1. This goes along with the back pressure concept (The task is stuck writing to the buffer for the next stage as the next stage is consuming too slowly), however, the second task’s metric tell me that “0.buffers.inputQueueLength” is constant on value 0 and “0.buffers.inPoolUsage” is constant on value 0 as well. “0.numRecodsInPerSecond” is about 150-300 elements/s.

 

This lead me to suspect that I don’t have a CEP problem but really have a problem with “keyBy” on localhost as the second task seem to immediately consume any messages in input queue it receives. My questions:

  1. Is my observation correct that I indeed don’t have a CEP problem but the keyBy causes the issue here?
  2. Why is the queue limited to size 9? Seems really small compared to the memory available… I would have expected something like 10000 at least.
  3. What happens internally here? In distributed mode, I understand that I have a distributed queue where sender queue size and receiver queue size can be different. But on the same machine, in the same JVM, I would have expected something like a BlockingQueue where both task metrics report the same queue size (i.e. task1.outputqueue == task2.inputqueue). I expect something happening there in between “task1 writes” and “task2 receives” which slows down the entire pipeline, but I have no idea on what that could be.
  4.  Can I do something in order to boost performance by magnitudes here?

 

Best regards

Theo Diefenthal


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Possilby very slow keyBy in program with no parallelism

Piotr Nowojski-3
Hi Theo,

Regarding the performance issue.

> None of my machine resources is fully utilized, i.e. none of the cluster CPU runs at 100% utilization (according to htop). And the memory is virtually available, but the RES column in htop states the processes uses 5499MB.

By nature of stream processing, where low latencies are the goal, it might be sometimes difficult to achieve 100% CPU utilisation, since Flink needs to do some synchronisations/records handover once per record. However first thing you should try is to increase parallelism. 

> however, the second task’s metric tell me that “0.buffers.inputQueueLength” is constant on value 0 and “0.buffers.inPoolUsage” is constant on value 0 as well.

Unfortunately this is an undocumented “feature” that inputQueueLength doesn’t account for local input channels [1]. Since you are using only one TaskManager, this metric will be always 0 for you.

> Is my observation correct that I indeed don’t have a CEP problem but the keyBy causes the issue here?

I would guess this is unlikely. Flink’s network stack is able to transport tens of millions of records per second per machine (depending on size of the record and serialisation cost). 

In your case I would still guess that CEP is the bottleneck and it will be probably visible if you increase number of TaskManagers. You can also run CPU profiler to confirm that.

> Why is the queue limited to size 9? Seems really small compared to the memory available… I would have expected something like 10000 at least.

Those metrics are in “buffers” not records - one buffer = 32KB of data. 

Number 9 comes from credit based flow control [2], where every channel is limited to 2 buffers + there are 7 “floating” buffers shared among all of the input channels. In your case, with one single input channel, it means the upper bound is 9. Those numbers are picked in this way to, that 9 buffers with reasonably normal “latencies/pings” between nodes should assure maximal throughput even if only a single input channel is used, while having 2 exclusive buffers per channel allow us to ensure fairness and solve some latencies issues during checkpointing.

> What happens internally here? In distributed mode, I understand that I have a distributed queue where sender queue size and receiver queue size can be different. But
> on the same machine, in the same JVM, I would have expected something like a BlockingQueue 

Yes, local input channels are blocking queues of buffers. (On top of that, InputGates are blocking queues of input channels). 

> Can I do something in order to boost performance by magnitudes here?

To be honest I have never tried to optimise CEP operator nor I don’t know if you can tune it. One simple way is to increase parallelism and/or scale out. If you would like to optimise CEP operator, we would be happy to accept/review such contribution :)

Best regards, Piotrek


On 20 May 2019, at 11:22, Dawid Wysakowicz <[hidden email]> wrote:

Hi Theo,

Could you try replacing the CEP operator with a simple flatMap to see if the CEP is the reason for the backpressure? Another reason for this behavior might be the time of serialization (what is the serialization format?) of the records. You could also try enabling object reuse[1]. Good idea would be also to try run the flink job under a profiler to see the hot spots.

One more thing I noticed, that you have set the bounded out of orderness this makes the CEP operator to accumulate events for 30 minutes (for the sorting) before it is forwarded to the underlying state machine for processing. Could you try decreasing this value?

I can't help with explanation how the network buffers work, but I cc Piotr who should be more suited to provide meaningful information.

Regards,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/execution_configuration.html#execution-configuration

On 19/05/2019 14:16, Theo Diefenthal wrote:
Hi,
 
I wrote a small Flink program on a yarn cluster (128GB RAM, 8 core Xeon CPU for each node) essentially reading messages from kafka and applying a simple CEP rule on those messages. The program is expected to have a parallelism of 1 for input as my test kafka topic has only 1 partition. 
 
The program looks pretty much like this:
 
FlinkKafkaConsumer<Databean> flinkKafkaConsumer = new FlinkKafkaConsumer<>(sourceTopicName, deserializer, properties);
flinkKafkaConsumer.setStartFromGroupOffsets();
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(
true);
flinkKafkaConsumer.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Databean>(Time.minutes(30)) {
    
@Override
    
public long extractTimestamp(Databean element) {
        
return element.getTs();
    }
});
SingleOutputStreamOperator<Databean> kafkaSource = env.addSource(flinkKafkaConsumer)
 
// Keying here is very slow???
KeyedStream<Databean, String> keyedStream = proxylogsStream.keyBy(bean -> bean.getUser());
 
Pattern<Databean, Databean> cepPattern = Pattern.<Databean>
        begin("firstStep", AfterMatchSkipStrategy.skipPastLastEvent()).where(new FirstFilter())
        .followedBy("secondStep").where(new SecondFilter())
        .within(Time.minutes(15));

PatternStream<Databean> patternMatchStream = CEP.pattern(keyedStream, cepPattern);

SingleOutputStreamOperator<Alarmbean> beanAlerts = patternMatchStream.select(new MatchToAlarmConverter());

beanAlerts.addSink(new FlinkKafkaProducer<>(config.kafkaAlertsTopic, new AlarmBeanSerializeSchema(), properties));
 
The applied CEP filter “FirstFilter” and “SecondFilter” are very simple rules like 
return “humidity”.equals(bean.getRecordedMetricType())
 
My Databean has round about 50 elements containing numbers and small strings (Up to 50 characters). 
Currently, I write 200.000 Elements into my Kafka topic every 5 minutes. 100.000 of those have the same username, i.e. all have the name “empty”, and the other half are almost unique. (some random number between 1 and 100000000). The generated data timestamp randomly varies +-7.5 minutes between the generated timestamp (Generation time = time pushed into kafka). 
My CEP rule is written with conditions that never match, so the kafka sink as well as the stream select function can be eliminated as causes for the slow processing speeds. 
 
I start the application via yarn with: 
"${FLINK_HOME}/bin/yarn-session.sh" -n 4 -jm 4096m -tm 65536m --name "TESTJOB" -d
${FLINK_HOME}/bin/flink run -m ${FLINK_HOST} -d -n "${JOB_JAR}" $*
 
So the job has plenty of RAM available, but I didn’t note any difference in terms of speed when assigning 16G or 64G of RAM.  As expected, I have a single task manager and parallelism 1. 
 
Now about my problem:
Currently, the pipeline processes round about 150-300 elements per second. On startup, it peaks to 3000-4000 elements per second but slows down within one minute to 150-300 elements per second.
I immediately expected CEP to be that slow (As this is my first CEP experiment), but I observed the following:
  1. Even though CEP has quite some overhead (elements must be sorted on time), my rule is very simple and should, in my perspective, perform much better on that machine. My shot in the dark before was something like 10.000 – 100.000 elements/s. 
  2. None of my machine resources is fully utilized, i.e. none of the cluster CPU runs at 100% utilization (according to htop). And the memory is virtually available, but the RES column in htop states the processes uses 5499MB. 
  3. According to Flink GUI, the job is split into two tasks: First there is the source task, then there is a hash arrow to the second task (the keyBy?!) and the second is cep-pattern apply,convert and write to sink.  From the UI, I know that in task 1 there is a HIGH backpressure whereas task 2 is OK in terms of backpressure measurement. Even more interesting: The metrics let me know that in the first task “0.buffers.outputQueueLength” is constantly on value 9 and “0.buffers.outPoolUsage” is constant on value 1. This goes along with the back pressure concept (The task is stuck writing to the buffer for the next stage as the next stage is consuming too slowly), however, the second task’s metric tell me that “0.buffers.inputQueueLength” is constant on value 0 and “0.buffers.inPoolUsage” is constant on value 0 as well. “0.numRecodsInPerSecond” is about 150-300 elements/s. 
 
This lead me to suspect that I don’t have a CEP problem but really have a problem with “keyBy” on localhost as the second task seem to immediately consume any messages in input queue it receives. My questions:
  1. Is my observation correct that I indeed don’t have a CEP problem but the keyBy causes the issue here?
  2. Why is the queue limited to size 9? Seems really small compared to the memory available… I would have expected something like 10000 at least. 
  3. What happens internally here? In distributed mode, I understand that I have a distributed queue where sender queue size and receiver queue size can be different. But on the same machine, in the same JVM, I would have expected something like a BlockingQueue where both task metrics report the same queue size (i.e. task1.outputqueue == task2.inputqueue). I expect something happening there in between “task1 writes” and “task2 receives” which slows down the entire pipeline, but I have no idea on what that could be. 
  4.  Can I do something in order to boost performance by magnitudes here?
 
Best regards
Theo Diefenthal