Unbalanced Kafka consumer consumption

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Unbalanced Kafka consumer consumption

gerardg
Hi,

We are experience issues scaling our Flink application and we have observed
that it may be because Kafka messages consumption is not balanced across
partitions. The attached image (lag per partition) shows how only one
partition consumes messages (the blue one in the back) and it wasn't until
it finished that the other ones started to consume at a good rate (actually
the total throughput multiplied by 4 when these started) . Also, when that
ones started to consume, one partition just stopped an accumulated messages
back again until they finished.

We don't see any resource (CPU, network, disk..) struggling in our cluster
so we are not sure what could be causing this behavior. I can only assume
that somehow Flink or the Kafka consumer is artificially slowing down the
other partitions. Maybe due to how back pressure is handled?

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>

Gerard





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Unbalanced Kafka consumer consumption

Elias Levy
You can always shuffle the stream generated by the Kafka source (dataStream.shuffle()) to evenly distribute records downstream.

On Fri, Oct 26, 2018 at 2:08 AM gerardg <[hidden email]> wrote:
Hi,

We are experience issues scaling our Flink application and we have observed
that it may be because Kafka messages consumption is not balanced across
partitions. The attached image (lag per partition) shows how only one
partition consumes messages (the blue one in the back) and it wasn't until
it finished that the other ones started to consume at a good rate (actually
the total throughput multiplied by 4 when these started) . Also, when that
ones started to consume, one partition just stopped an accumulated messages
back again until they finished.

We don't see any resource (CPU, network, disk..) struggling in our cluster
so we are not sure what could be causing this behavior. I can only assume
that somehow Flink or the Kafka consumer is artificially slowing down the
other partitions. Maybe due to how back pressure is handled?

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>

Gerard





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Unbalanced Kafka consumer consumption

gerardg
The stream is partitioned by key after ingestion at the finest granularity that we can (which is finer than how stream is partitioned when produced to kafka). It is not perfectly balanced but still is not so unbalanced to show this behavior (more balanced than what the lag images show).

Anyway, let's assume that the problem is that the stream is so unbalanced that one operator subtask can't handle the ingestion rate. It is expected then that all the others operators reduce its ingestion rate even if they have resources to spare? The task is configured with processing time and there are no windows. If that is the case, is there a way to let operator subtasks process freely even if one of them is causing back pressure upstream?

The attached images shows how Kafka lag increases while the throughput is stable until some operator subtasks finish.

Thanks,

Gerard

On Fri, Oct 26, 2018 at 8:09 PM Elias Levy <[hidden email]> wrote:
You can always shuffle the stream generated by the Kafka source (dataStream.shuffle()) to evenly distribute records downstream.

On Fri, Oct 26, 2018 at 2:08 AM gerardg <[hidden email]> wrote:
Hi,

We are experience issues scaling our Flink application and we have observed
that it may be because Kafka messages consumption is not balanced across
partitions. The attached image (lag per partition) shows how only one
partition consumes messages (the blue one in the back) and it wasn't until
it finished that the other ones started to consume at a good rate (actually
the total throughput multiplied by 4 when these started) . Also, when that
ones started to consume, one partition just stopped an accumulated messages
back again until they finished.

We don't see any resource (CPU, network, disk..) struggling in our cluster
so we are not sure what could be causing this behavior. I can only assume
that somehow Flink or the Kafka consumer is artificially slowing down the
other partitions. Maybe due to how back pressure is handled?

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>

Gerard





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

kafka-lag.png (183K) Download Attachment
throughput.png (209K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Unbalanced Kafka consumer consumption

bupt_ljy
In reply to this post by gerardg

Hi, 

   If I understand your problem correctly, there is a similar JIRA issue FLINK-10348, reported by me. Maybe you can take a look at it.


Jiayi Liao,Best


 Original Message 
Sender: Gerard Garcia<[hidden email]>
Recipient: fearsome.lucidity<[hidden email]>
Cc: user<[hidden email]>
Date: Monday, Oct 29, 2018 17:50
Subject: Re: Unbalanced Kafka consumer consumption

The stream is partitioned by key after ingestion at the finest granularity that we can (which is finer than how stream is partitioned when produced to kafka). It is not perfectly balanced but still is not so unbalanced to show this behavior (more balanced than what the lag images show).

Anyway, let's assume that the problem is that the stream is so unbalanced that one operator subtask can't handle the ingestion rate. It is expected then that all the others operators reduce its ingestion rate even if they have resources to spare? The task is configured with processing time and there are no windows. If that is the case, is there a way to let operator subtasks process freely even if one of them is causing back pressure upstream?

The attached images shows how Kafka lag increases while the throughput is stable until some operator subtasks finish.

Thanks,

Gerard

On Fri, Oct 26, 2018 at 8:09 PM Elias Levy <[hidden email]> wrote:
You can always shuffle the stream generated by the Kafka source (dataStream.shuffle()) to evenly distribute records downstream.

On Fri, Oct 26, 2018 at 2:08 AM gerardg <[hidden email]> wrote:
Hi,

We are experience issues scaling our Flink application and we have observed
that it may be because Kafka messages consumption is not balanced across
partitions. The attached image (lag per partition) shows how only one
partition consumes messages (the blue one in the back) and it wasn't until
it finished that the other ones started to consume at a good rate (actually
the total throughput multiplied by 4 when these started) . Also, when that
ones started to consume, one partition just stopped an accumulated messages
back again until they finished.

We don't see any resource (CPU, network, disk..) struggling in our cluster
so we are not sure what could be causing this behavior. I can only assume
that somehow Flink or the Kafka consumer is artificially slowing down the
other partitions. Maybe due to how back pressure is handled?

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>

Gerard





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Unbalanced Kafka consumer consumption

gerardg
I think my problem is not the same, yours is that you want to consume from partitions with more data faster to avoid consuming first the one with less elements which could advance the event time too fast. Mine is that Kafka only consumes from some partitions even if it seems that it has resources to read and process from all of them at the same time.

Gerard

On Tue, Oct 30, 2018 at 9:36 AM bupt_ljy <[hidden email]> wrote:

Hi, 

   If I understand your problem correctly, there is a similar JIRA issue FLINK-10348, reported by me. Maybe you can take a look at it.


Jiayi Liao,Best


 Original Message 
Sender: Gerard Garcia<[hidden email]>
Recipient: fearsome.lucidity<[hidden email]>
Cc: user<[hidden email]>
Date: Monday, Oct 29, 2018 17:50
Subject: Re: Unbalanced Kafka consumer consumption

The stream is partitioned by key after ingestion at the finest granularity that we can (which is finer than how stream is partitioned when produced to kafka). It is not perfectly balanced but still is not so unbalanced to show this behavior (more balanced than what the lag images show).

Anyway, let's assume that the problem is that the stream is so unbalanced that one operator subtask can't handle the ingestion rate. It is expected then that all the others operators reduce its ingestion rate even if they have resources to spare? The task is configured with processing time and there are no windows. If that is the case, is there a way to let operator subtasks process freely even if one of them is causing back pressure upstream?

The attached images shows how Kafka lag increases while the throughput is stable until some operator subtasks finish.

Thanks,

Gerard

On Fri, Oct 26, 2018 at 8:09 PM Elias Levy <[hidden email]> wrote:
You can always shuffle the stream generated by the Kafka source (dataStream.shuffle()) to evenly distribute records downstream.

On Fri, Oct 26, 2018 at 2:08 AM gerardg <[hidden email]> wrote:
Hi,

We are experience issues scaling our Flink application and we have observed
that it may be because Kafka messages consumption is not balanced across
partitions. The attached image (lag per partition) shows how only one
partition consumes messages (the blue one in the back) and it wasn't until
it finished that the other ones started to consume at a good rate (actually
the total throughput multiplied by 4 when these started) . Also, when that
ones started to consume, one partition just stopped an accumulated messages
back again until they finished.

We don't see any resource (CPU, network, disk..) struggling in our cluster
so we are not sure what could be causing this behavior. I can only assume
that somehow Flink or the Kafka consumer is artificially slowing down the
other partitions. Maybe due to how back pressure is handled?

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>

Gerard





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Unbalanced Kafka consumer consumption

Till Rohrmann
Hi Gerard,

the behaviour you are describing sounds odd to me. I have a couple of questions:

1. Which Flink and Kafka version are you using?
2. How many partitions do you have? --> Try to set the parallelism of your job to the number of partitions. That way, you will have one partition per source task.
3. How are the source operators distributed? Are they running on different nodes?
4. What do you mean with "until it (the blue one) was finished consuming the partition"? I assume that you don't ingest into the Kafka topic live but want to read persisted data.
5. Are you using Flink's metrics to monitor the different source tasks? Check what the source operator's output rate is (should be visible from the web UI).

Cheers,
Till

On Tue, Oct 30, 2018 at 10:27 AM Gerard Garcia <[hidden email]> wrote:
I think my problem is not the same, yours is that you want to consume from partitions with more data faster to avoid consuming first the one with less elements which could advance the event time too fast. Mine is that Kafka only consumes from some partitions even if it seems that it has resources to read and process from all of them at the same time.

Gerard

On Tue, Oct 30, 2018 at 9:36 AM bupt_ljy <[hidden email]> wrote:

Hi, 

   If I understand your problem correctly, there is a similar JIRA issue FLINK-10348, reported by me. Maybe you can take a look at it.


Jiayi Liao,Best


 Original Message 
Sender: Gerard Garcia<[hidden email]>
Recipient: fearsome.lucidity<[hidden email]>
Cc: user<[hidden email]>
Date: Monday, Oct 29, 2018 17:50
Subject: Re: Unbalanced Kafka consumer consumption

The stream is partitioned by key after ingestion at the finest granularity that we can (which is finer than how stream is partitioned when produced to kafka). It is not perfectly balanced but still is not so unbalanced to show this behavior (more balanced than what the lag images show).

Anyway, let's assume that the problem is that the stream is so unbalanced that one operator subtask can't handle the ingestion rate. It is expected then that all the others operators reduce its ingestion rate even if they have resources to spare? The task is configured with processing time and there are no windows. If that is the case, is there a way to let operator subtasks process freely even if one of them is causing back pressure upstream?

The attached images shows how Kafka lag increases while the throughput is stable until some operator subtasks finish.

Thanks,

Gerard

On Fri, Oct 26, 2018 at 8:09 PM Elias Levy <[hidden email]> wrote:
You can always shuffle the stream generated by the Kafka source (dataStream.shuffle()) to evenly distribute records downstream.

On Fri, Oct 26, 2018 at 2:08 AM gerardg <[hidden email]> wrote:
Hi,

We are experience issues scaling our Flink application and we have observed
that it may be because Kafka messages consumption is not balanced across
partitions. The attached image (lag per partition) shows how only one
partition consumes messages (the blue one in the back) and it wasn't until
it finished that the other ones started to consume at a good rate (actually
the total throughput multiplied by 4 when these started) . Also, when that
ones started to consume, one partition just stopped an accumulated messages
back again until they finished.

We don't see any resource (CPU, network, disk..) struggling in our cluster
so we are not sure what could be causing this behavior. I can only assume
that somehow Flink or the Kafka consumer is artificially slowing down the
other partitions. Maybe due to how back pressure is handled?

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>

Gerard





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Unbalanced Kafka consumer consumption

gerardg
HI Till,

sorry for the late reply, I was on holidays and couldn't follow up the issue.

1. Flink 1.6.1, Kafka 1.1.0
2. The topic has 64 partitions. We don't have so many slots available but we could try this.
3. Yes, they are running in different nodes
4. I meant that until the operator that is represented by its lag (meaning the amount of messages to consume) with a blue line (in the background) finishes consuming all its pending messages the rate does not increase.
 Yes, the problem appears when there is pending data (persisted) because it can keep with the incoming rate or the app has been stopped.
5. Yes. I guess you mean numRecordsOutPerSecond. We will monitor this to have data the next time it happens.

I'll try to reduce the number of partitions so they can be assigned one per source task and see how it behaves.

Thanks,

Gerard

On Wed, Nov 7, 2018 at 5:38 PM Till Rohrmann <[hidden email]> wrote:
Hi Gerard,

the behaviour you are describing sounds odd to me. I have a couple of questions:

1. Which Flink and Kafka version are you using?
2. How many partitions do you have? --> Try to set the parallelism of your job to the number of partitions. That way, you will have one partition per source task.
3. How are the source operators distributed? Are they running on different nodes?
4. What do you mean with "until it (the blue one) was finished consuming the partition"? I assume that you don't ingest into the Kafka topic live but want to read persisted data.
5. Are you using Flink's metrics to monitor the different source tasks? Check what the source operator's output rate is (should be visible from the web UI).

Cheers,
Till

On Tue, Oct 30, 2018 at 10:27 AM Gerard Garcia <[hidden email]> wrote:
I think my problem is not the same, yours is that you want to consume from partitions with more data faster to avoid consuming first the one with less elements which could advance the event time too fast. Mine is that Kafka only consumes from some partitions even if it seems that it has resources to read and process from all of them at the same time.

Gerard

On Tue, Oct 30, 2018 at 9:36 AM bupt_ljy <[hidden email]> wrote:

Hi, 

   If I understand your problem correctly, there is a similar JIRA issue FLINK-10348, reported by me. Maybe you can take a look at it.


Jiayi Liao,Best


 Original Message 
Sender: Gerard Garcia<[hidden email]>
Recipient: fearsome.lucidity<[hidden email]>
Cc: user<[hidden email]>
Date: Monday, Oct 29, 2018 17:50
Subject: Re: Unbalanced Kafka consumer consumption

The stream is partitioned by key after ingestion at the finest granularity that we can (which is finer than how stream is partitioned when produced to kafka). It is not perfectly balanced but still is not so unbalanced to show this behavior (more balanced than what the lag images show).

Anyway, let's assume that the problem is that the stream is so unbalanced that one operator subtask can't handle the ingestion rate. It is expected then that all the others operators reduce its ingestion rate even if they have resources to spare? The task is configured with processing time and there are no windows. If that is the case, is there a way to let operator subtasks process freely even if one of them is causing back pressure upstream?

The attached images shows how Kafka lag increases while the throughput is stable until some operator subtasks finish.

Thanks,

Gerard

On Fri, Oct 26, 2018 at 8:09 PM Elias Levy <[hidden email]> wrote:
You can always shuffle the stream generated by the Kafka source (dataStream.shuffle()) to evenly distribute records downstream.

On Fri, Oct 26, 2018 at 2:08 AM gerardg <[hidden email]> wrote:
Hi,

We are experience issues scaling our Flink application and we have observed
that it may be because Kafka messages consumption is not balanced across
partitions. The attached image (lag per partition) shows how only one
partition consumes messages (the blue one in the back) and it wasn't until
it finished that the other ones started to consume at a good rate (actually
the total throughput multiplied by 4 when these started) . Also, when that
ones started to consume, one partition just stopped an accumulated messages
back again until they finished.

We don't see any resource (CPU, network, disk..) struggling in our cluster
so we are not sure what could be causing this behavior. I can only assume
that somehow Flink or the Kafka consumer is artificially slowing down the
other partitions. Maybe due to how back pressure is handled?

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>

Gerard





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Unbalanced Kafka consumer consumption

gerardg
We finally figure it out. We had a large value in the Kafka consumer option 'max.partition.fetch.bytes', this made the KafkaConsumer to not consume at a balanced rate from all partitions.

Gerard
Reply | Threaded
Open this post in threaded view
|

Re: Unbalanced Kafka consumer consumption

Till Rohrmann
Great to hear and thanks for letting us know.

Cheers,
Till

On Wed, Dec 19, 2018 at 5:39 PM Gerard Garcia <[hidden email]> wrote:
We finally figure it out. We had a large value in the Kafka consumer option 'max.partition.fetch.bytes', this made the KafkaConsumer to not consume at a balanced rate from all partitions.

Gerard