Flink kafka consumers stopped consuming messages

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink kafka consumers stopped consuming messages

idkfaon
Hi there,

today I've observed strange behaviour of a flink streaming application (flink 1.6.1, per-job cluster deployment, yarn):
3 task managers (2 slots each) are running but only 1 slot is actually consuming messages from kafka (v0.11.0.2), others were idling (currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics).

So I started to investigate:
- `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for all 6 topic partitions are constantly increasing.
- `kafka-consumer-groups.sh` listed only single (the 4th) partition. That makes me thinks that by somehow 5 kafka consumers lost connection to brokers.
- A LOT of messages "Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity." in each task manager instance.
- 5/6 slot didn’t advanced currentOutputWatermark for about 3 days.
- no application errors/uncaught exceptions etc.
- no reconnections to kafka.
- some network issues connected with hdfs (Slow waitForAckedSeqno).
- all kafka networking setting are default (e.g. timeouts).

After job restart all task managers started to consume messages (6 slots in total, and `kafka-consumer-groups.sh` listed that all 6 partitions are consumed).

May be someone had already experienced something similar?

Job topology is as follows (no window operations!):
```
val dataStream = env.addSource(kafkaSource).map(processor);

val terminalStream = AsyncDataStream
    .unorderedWait(dataStream, asyncFun, timout, timeoutUnit)
    .process(sideOutputFun);

terminalStream
    .keyBy(selector)
    .process(keyProcFun)
    .addSink(kafkaSink_1);

terminalStream
    .getSideOutput("outputTag")
    .addSink(kafkaSink_2);
```
Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka consumers stopped consuming messages

Arvid Heise-4
Hi Ilya,

These messages could pop up when a Kafka broker is down but should eventually disappear. So I'm a bit lost.

If there was a bug, it's also most likely fixed in the meantime. So if you want to be on the safe side, I'd try to upgrade to more recent versions (Flink + Kafka consumer).

Best,

Arvid

On Wed, Jun 2, 2021 at 7:01 PM Ilya Karpov <[hidden email]> wrote:
Hi there,

today I've observed strange behaviour of a flink streaming application (flink 1.6.1, per-job cluster deployment, yarn):
3 task managers (2 slots each) are running but only 1 slot is actually consuming messages from kafka (v0.11.0.2), others were idling (currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics).

So I started to investigate:
- `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for all 6 topic partitions are constantly increasing.
- `kafka-consumer-groups.sh` listed only single (the 4th) partition. That makes me thinks that by somehow 5 kafka consumers lost connection to brokers.
- A LOT of messages "Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity." in each task manager instance.
- 5/6 slot didn’t advanced currentOutputWatermark for about 3 days.
- no application errors/uncaught exceptions etc.
- no reconnections to kafka.
- some network issues connected with hdfs (Slow waitForAckedSeqno).
- all kafka networking setting are default (e.g. timeouts).

After job restart all task managers started to consume messages (6 slots in total, and `kafka-consumer-groups.sh` listed that all 6 partitions are consumed).

May be someone had already experienced something similar?

Job topology is as follows (no window operations!):
```
val dataStream = env.addSource(kafkaSource).map(processor);

val terminalStream = AsyncDataStream
    .unorderedWait(dataStream, asyncFun, timout, timeoutUnit)
    .process(sideOutputFun);

terminalStream
    .keyBy(selector)
    .process(keyProcFun)
    .addSink(kafkaSink_1);

terminalStream
    .getSideOutput("outputTag")
    .addSink(kafkaSink_2);
```
Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka consumers stopped consuming messages

idkfaon
Hi Arvid,
thanks for reply,

thread dump + logs research didn’t help. We suggested that problem was in async call to remote key-value storage because we (1) found that async client timeout was set to 0 (effectively no timeout, idle infinitely), (2) async client threads we sleeping, (3) AsyncWaitOperator.Emitter thread was blocked peeking new async result while AsyncWaitOperator.processWatermak was blocked to put new item in a queue. We changed timeout to non zero value and since then (for a week or so) job doesn’t hang. So, I guess the problem was in async client timeout (not in kafka or flink).

Hope this helps someone!

9 июня 2021 г., в 14:10, Arvid Heise <[hidden email]> написал(а):

Hi Ilya,

These messages could pop up when a Kafka broker is down but should eventually disappear. So I'm a bit lost.

If there was a bug, it's also most likely fixed in the meantime. So if you want to be on the safe side, I'd try to upgrade to more recent versions (Flink + Kafka consumer).

Best,

Arvid

On Wed, Jun 2, 2021 at 7:01 PM Ilya Karpov <[hidden email]> wrote:
Hi there,

today I've observed strange behaviour of a flink streaming application (flink 1.6.1, per-job cluster deployment, yarn):
3 task managers (2 slots each) are running but only 1 slot is actually consuming messages from kafka (v0.11.0.2), others were idling (currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics).

So I started to investigate:
- `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for all 6 topic partitions are constantly increasing.
- `kafka-consumer-groups.sh` listed only single (the 4th) partition. That makes me thinks that by somehow 5 kafka consumers lost connection to brokers.
- A LOT of messages "Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity." in each task manager instance.
- 5/6 slot didn’t advanced currentOutputWatermark for about 3 days.
- no application errors/uncaught exceptions etc.
- no reconnections to kafka.
- some network issues connected with hdfs (Slow waitForAckedSeqno).
- all kafka networking setting are default (e.g. timeouts).

After job restart all task managers started to consume messages (6 slots in total, and `kafka-consumer-groups.sh` listed that all 6 partitions are consumed).

May be someone had already experienced something similar?

Job topology is as follows (no window operations!):
```
val dataStream = env.addSource(kafkaSource).map(processor);

val terminalStream = AsyncDataStream
    .unorderedWait(dataStream, asyncFun, timout, timeoutUnit)
    .process(sideOutputFun);

terminalStream
    .keyBy(selector)
    .process(keyProcFun)
    .addSink(kafkaSink_1);

terminalStream
    .getSideOutput("outputTag")
    .addSink(kafkaSink_2);
```