Flink kafka consumer stopped committing offsets

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink kafka consumer stopped committing offsets

Juho Autio
Hi,

We have a Flink stream job that uses Flink kafka consumer. Normally it commits consumer offsets to Kafka.

However this stream ended up in a state where it's otherwise working just fine, but it isn't committing offsets to Kafka any more. The job keeps writing correct aggregation results to the sink, though. At the time of writing this, the job has been running 14 hours without committing offsets.

Below is an extract from taskmanager.log. As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends, these are the last lines so far.

Could you help check if this is a know bug, possibly already fixed, or something new?

I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.

Cheers,
Juho

2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.10.2.1
2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : e89bffd6b2eff799
2018-06-06 10:01:33,560 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000.
2018-06-06 10:01:33,563 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000.
2018-06-07 22:08:28,773 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:28,776 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''}, topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''}, topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets.
2018-06-07 22:08:29,840 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:29,841 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=12298347875, metadata=''}, topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''}, topic1-14=OffsetAndMetadata{offset=12299972108, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets.

Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka consumer stopped committing offsets

Piotr Nowojski
Hi,

What’s your KafkaConsumer configuration? Especially values for:
- is checkpointing enabled?
- enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms
- did you set setCommitOffsetsOnCheckpoints() ?


> Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.

Can you post full logs from all TaskManagers/JobManager and can you say/estimate when did the committing brake/stop? Did you check Kafka logs for any errors?

To me it seems more like a Kafka issue/bug:
Especially that in your case this offsets committing is superseded by Kafka coordinator failure.

Piotrek

On 8 Jun 2018, at 10:05, Juho Autio <[hidden email]> wrote:

Hi,

We have a Flink stream job that uses Flink kafka consumer. Normally it commits consumer offsets to Kafka.

However this stream ended up in a state where it's otherwise working just fine, but it isn't committing offsets to Kafka any more. The job keeps writing correct aggregation results to the sink, though. At the time of writing this, the job has been running 14 hours without committing offsets.

Below is an extract from taskmanager.log. As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends, these are the last lines so far.

Could you help check if this is a know bug, possibly already fixed, or something new?

I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.

Cheers,
Juho

2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.10.2.1
2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : e89bffd6b2eff799
2018-06-06 10:01:33,560 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000.
2018-06-06 10:01:33,563 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000.
2018-06-07 22:08:28,773 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:28,776 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''}, topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''}, topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets.
2018-06-07 22:08:29,840 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:29,841 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=12298347875, metadata=''}, topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''}, topic1-14=OffsetAndMetadata{offset=12299972108, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets.


Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka consumer stopped committing offsets

Juho Autio
Hi Piotr, thanks for your insights.

> What’s your KafkaConsumer configuration?

We only set these in the properties that are passed to FlinkKafkaConsumer010 constructor:

auto.offset.reset=latest
bootstrap.servers=my-kafka-host:9092
group.id=my_group
flink.partition-discovery.interval-millis=30000

> is checkpointing enabled?

No.

> enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms

We have whatever is the default behaviour of Flink kafka consumer. It seems to commit quite often, something like every 5 seconds.

> did you set setCommitOffsetsOnCheckpoints() ?

No. But I checked with debugger that apparently enableCommitOnCheckpoints=true is the default.

I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.

So I guess you're right that this bug doesn't seem to be in Flink itself? I wonder if it's a known issue in Kafka client lib..

I also took thread dump on one of the task managers in this broken state. But I couldn't spot anything obvious when comparing the threads to a dump from a job where offsets are being committed. Any way I've saved the thread dump in case there's something to look for specifically.

Sharing the full logs of job & task managers would be a bit of a hassle, because I don't have an automatic way to obfuscate the logs so that I'm sure that there isn't anything sensitive left. Any way, there isn't anything else to share really. I wrote: "As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends".

Thanks once more.

On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

What’s your KafkaConsumer configuration? Especially values for:
- is checkpointing enabled?
- enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms
- did you set setCommitOffsetsOnCheckpoints() ?


> Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.

Can you post full logs from all TaskManagers/JobManager and can you say/estimate when did the committing brake/stop? Did you check Kafka logs for any errors?

To me it seems more like a Kafka issue/bug:
Especially that in your case this offsets committing is superseded by Kafka coordinator failure.

Piotrek


On 8 Jun 2018, at 10:05, Juho Autio <[hidden email]> wrote:

Hi,

We have a Flink stream job that uses Flink kafka consumer. Normally it commits consumer offsets to Kafka.

However this stream ended up in a state where it's otherwise working just fine, but it isn't committing offsets to Kafka any more. The job keeps writing correct aggregation results to the sink, though. At the time of writing this, the job has been running 14 hours without committing offsets.

Below is an extract from taskmanager.log. As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends, these are the last lines so far.

Could you help check if this is a know bug, possibly already fixed, or something new?

I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.

Cheers,
Juho

2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.10.2.1
2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : e89bffd6b2eff799
2018-06-06 10:01:33,560 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000.
2018-06-06 10:01:33,563 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000.
2018-06-07 22:08:28,773 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:28,776 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''}, topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''}, topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets.
2018-06-07 22:08:29,840 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:29,841 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=12298347875, metadata=''}, topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''}, topic1-14=OffsetAndMetadata{offset=12299972108, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets.



Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka consumer stopped committing offsets

Piotr Nowojski
The more I look into it, the more it seems like a Kafka bug or some cluster failure from which your Kafka cluster did not recover.

In your cases auto committing should be set to true and in that case KafkaConsumer should commit offsets once every so often when it’s polling messages. Unless for example `cordinatorUnknown()` returns false in `org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync` (Kafka 0.10.2.1 code base):

private void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled) {
        if (coordinatorUnknown()) {
            this.nextAutoCommitDeadline = now + retryBackoffMs;
        } else if (now >= nextAutoCommitDeadline) {
            this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
            doAutoCommitOffsetsAsync();
        }
    }
}

Have you checked Kafka logs? This suggests that the real problem is hidden behind:

>  INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000

And maybe your Kafka cluster/consumer can not recover from this situation.

Another thing to try (simpler) is to just trying upgrading Kafka cluster.

Piotrek

On 11 Jun 2018, at 11:44, Juho Autio <[hidden email]> wrote:

Hi Piotr, thanks for your insights.

> What’s your KafkaConsumer configuration?

We only set these in the properties that are passed to FlinkKafkaConsumer010 constructor:

auto.offset.reset=latest
bootstrap.servers=my-kafka-host:9092
group.id=my_group
flink.partition-discovery.interval-millis=30000

> is checkpointing enabled?

No.

> enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms

We have whatever is the default behaviour of Flink kafka consumer. It seems to commit quite often, something like every 5 seconds.

> did you set setCommitOffsetsOnCheckpoints() ?

No. But I checked with debugger that apparently enableCommitOnCheckpoints=true is the default.

I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.

So I guess you're right that this bug doesn't seem to be in Flink itself? I wonder if it's a known issue in Kafka client lib..

I also took thread dump on one of the task managers in this broken state. But I couldn't spot anything obvious when comparing the threads to a dump from a job where offsets are being committed. Any way I've saved the thread dump in case there's something to look for specifically.

Sharing the full logs of job & task managers would be a bit of a hassle, because I don't have an automatic way to obfuscate the logs so that I'm sure that there isn't anything sensitive left. Any way, there isn't anything else to share really. I wrote: "As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends".

Thanks once more.

On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

What’s your KafkaConsumer configuration? Especially values for:
- is checkpointing enabled?
- enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms
- did you set setCommitOffsetsOnCheckpoints() ?


> Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.

Can you post full logs from all TaskManagers/JobManager and can you say/estimate when did the committing brake/stop? Did you check Kafka logs for any errors?

To me it seems more like a Kafka issue/bug:
Especially that in your case this offsets committing is superseded by Kafka coordinator failure.

Piotrek


On 8 Jun 2018, at 10:05, Juho Autio <[hidden email]> wrote:

Hi,

We have a Flink stream job that uses Flink kafka consumer. Normally it commits consumer offsets to Kafka.

However this stream ended up in a state where it's otherwise working just fine, but it isn't committing offsets to Kafka any more. The job keeps writing correct aggregation results to the sink, though. At the time of writing this, the job has been running 14 hours without committing offsets.

Below is an extract from taskmanager.log. As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends, these are the last lines so far.

Could you help check if this is a know bug, possibly already fixed, or something new?

I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.

Cheers,
Juho

2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.10.2.1
2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : e89bffd6b2eff799
2018-06-06 10:01:33,560 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000.
2018-06-06 10:01:33,563 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000.
2018-06-07 22:08:28,773 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:28,776 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''}, topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''}, topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets.
2018-06-07 22:08:29,840 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:29,841 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=12298347875, metadata=''}, topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''}, topic1-14=OffsetAndMetadata{offset=12299972108, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets.




Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka consumer stopped committing offsets

amit pal
Probably your kafka consumer is rebalancing.  This can be due to a bigger message processing time due to which kafka broker is marking your consumer dead and rebalancing. This all happens before the consumer can commit the offsets.

On Mon, Jun 11, 2018 at 7:37 PM Piotr Nowojski <[hidden email]> wrote:
The more I look into it, the more it seems like a Kafka bug or some cluster failure from which your Kafka cluster did not recover.

In your cases auto committing should be set to true and in that case KafkaConsumer should commit offsets once every so often when it’s polling messages. Unless for example `cordinatorUnknown()` returns false in `org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync` (Kafka 0.10.2.1 code base):

private void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled) {
        if (coordinatorUnknown()) {
            this.nextAutoCommitDeadline = now + retryBackoffMs;
        } else if (now >= nextAutoCommitDeadline) {
            this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
            doAutoCommitOffsetsAsync();
        }
    }
}

Have you checked Kafka logs? This suggests that the real problem is hidden behind:

>  INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: <a href="tel:(214)%20748-3550" value="+12147483550" target="_blank">2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000

And maybe your Kafka cluster/consumer can not recover from this situation.

Another thing to try (simpler) is to just trying upgrading Kafka cluster.

Piotrek

On 11 Jun 2018, at 11:44, Juho Autio <[hidden email]> wrote:

Hi Piotr, thanks for your insights.

> What’s your KafkaConsumer configuration?

We only set these in the properties that are passed to FlinkKafkaConsumer010 constructor:

auto.offset.reset=latest
bootstrap.servers=my-kafka-host:9092
group.id=my_group
flink.partition-discovery.interval-millis=30000

> is checkpointing enabled?

No.

> enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms

We have whatever is the default behaviour of Flink kafka consumer. It seems to commit quite often, something like every 5 seconds.

> did you set setCommitOffsetsOnCheckpoints() ?

No. But I checked with debugger that apparently enableCommitOnCheckpoints=true is the default.

I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.

So I guess you're right that this bug doesn't seem to be in Flink itself? I wonder if it's a known issue in Kafka client lib..

I also took thread dump on one of the task managers in this broken state. But I couldn't spot anything obvious when comparing the threads to a dump from a job where offsets are being committed. Any way I've saved the thread dump in case there's something to look for specifically.

Sharing the full logs of job & task managers would be a bit of a hassle, because I don't have an automatic way to obfuscate the logs so that I'm sure that there isn't anything sensitive left. Any way, there isn't anything else to share really. I wrote: "As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends".

Thanks once more.

On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

What’s your KafkaConsumer configuration? Especially values for:
- is checkpointing enabled?
- enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms
- did you set setCommitOffsetsOnCheckpoints() ?


> Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.

Can you post full logs from all TaskManagers/JobManager and can you say/estimate when did the committing brake/stop? Did you check Kafka logs for any errors?

To me it seems more like a Kafka issue/bug:
Especially that in your case this offsets committing is superseded by Kafka coordinator failure.

Piotrek


On 8 Jun 2018, at 10:05, Juho Autio <[hidden email]> wrote:

Hi,

We have a Flink stream job that uses Flink kafka consumer. Normally it commits consumer offsets to Kafka.

However this stream ended up in a state where it's otherwise working just fine, but it isn't committing offsets to Kafka any more. The job keeps writing correct aggregation results to the sink, though. At the time of writing this, the job has been running 14 hours without committing offsets.

Below is an extract from taskmanager.log. As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends, these are the last lines so far.

Could you help check if this is a know bug, possibly already fixed, or something new?

I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.

Cheers,
Juho

2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.10.2.1
2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : e89bffd6b2eff799
2018-06-06 10:01:33,560 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: <a href="tel:(214)%20748-3550" value="+12147483550" target="_blank">2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000.
2018-06-06 10:01:33,563 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: <a href="tel:(214)%20748-3550" value="+12147483550" target="_blank">2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000.
2018-06-07 22:08:28,773 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: <a href="tel:(214)%20748-3550" value="+12147483550" target="_blank">2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:28,776 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, metadata=''}, topic1-18=OffsetAndMetadata{offset=<a href="tel:(229)%20921-0444" value="+12299210444" target="_blank">12299210444, metadata=''}, topic3-0=OffsetAndMetadata{offset=<a href="tel:(506)%20427-7287" value="+15064277287" target="_blank">5064277287, metadata=''}, topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, topic1-10=OffsetAndMetadata{offset=<a href="tel:(229)%20974-2352" value="+12299742352" target="_blank">12299742352, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets.
2018-06-07 22:08:29,840 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: <a href="tel:(214)%20748-3550" value="+12147483550" target="_blank">2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:29,841 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=<a href="tel:(229)%20834-7875" value="+12298347875" target="_blank">12298347875, metadata=''}, topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''}, topic1-14=OffsetAndMetadata{offset=<a href="tel:(229)%20997-2108" value="+12299972108" target="_blank">12299972108, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets.




Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka consumer stopped committing offsets

Juho Autio
Hi,

Thanks for your analysis.

We found LeaderElectionRateAndTimeMs go to non-zero value on Kafka around the same time when this error was seen in the Flink job.

Kafka itself recovers from this and so do any other consumers that we have. It seems like a bug in kafka consumer library if this error causes it to stop committing offsets. If you have any further insight to this, please let me know.

Apart from that, leader election doesn't happen in normal situation. But it can happen for example if there are connectivity problems between the Kafka nodes.

On Mon, Jun 11, 2018 at 6:41 PM amit pal <[hidden email]> wrote:
Probably your kafka consumer is rebalancing.  This can be due to a bigger message processing time due to which kafka broker is marking your consumer dead and rebalancing. This all happens before the consumer can commit the offsets.

On Mon, Jun 11, 2018 at 7:37 PM Piotr Nowojski <[hidden email]> wrote:
The more I look into it, the more it seems like a Kafka bug or some cluster failure from which your Kafka cluster did not recover.

In your cases auto committing should be set to true and in that case KafkaConsumer should commit offsets once every so often when it’s polling messages. Unless for example `cordinatorUnknown()` returns false in `org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync` (Kafka 0.10.2.1 code base):

private void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled) {
        if (coordinatorUnknown()) {
            this.nextAutoCommitDeadline = now + retryBackoffMs;
        } else if (now >= nextAutoCommitDeadline) {
            this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
            doAutoCommitOffsetsAsync();
        }
    }
}

Have you checked Kafka logs? This suggests that the real problem is hidden behind:

>  INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: <a href="tel:(214)%20748-3550" value="+12147483550" target="_blank">2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000

And maybe your Kafka cluster/consumer can not recover from this situation.

Another thing to try (simpler) is to just trying upgrading Kafka cluster.

Piotrek

On 11 Jun 2018, at 11:44, Juho Autio <[hidden email]> wrote:

Hi Piotr, thanks for your insights.

> What’s your KafkaConsumer configuration?

We only set these in the properties that are passed to FlinkKafkaConsumer010 constructor:

auto.offset.reset=latest
bootstrap.servers=my-kafka-host:9092
group.id=my_group
flink.partition-discovery.interval-millis=30000

> is checkpointing enabled?

No.

> enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms

We have whatever is the default behaviour of Flink kafka consumer. It seems to commit quite often, something like every 5 seconds.

> did you set setCommitOffsetsOnCheckpoints() ?

No. But I checked with debugger that apparently enableCommitOnCheckpoints=true is the default.

I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.

So I guess you're right that this bug doesn't seem to be in Flink itself? I wonder if it's a known issue in Kafka client lib..

I also took thread dump on one of the task managers in this broken state. But I couldn't spot anything obvious when comparing the threads to a dump from a job where offsets are being committed. Any way I've saved the thread dump in case there's something to look for specifically.

Sharing the full logs of job & task managers would be a bit of a hassle, because I don't have an automatic way to obfuscate the logs so that I'm sure that there isn't anything sensitive left. Any way, there isn't anything else to share really. I wrote: "As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends".

Thanks once more.

On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

What’s your KafkaConsumer configuration? Especially values for:
- is checkpointing enabled?
- enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms
- did you set setCommitOffsetsOnCheckpoints() ?


> Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.

Can you post full logs from all TaskManagers/JobManager and can you say/estimate when did the committing brake/stop? Did you check Kafka logs for any errors?

To me it seems more like a Kafka issue/bug:
Especially that in your case this offsets committing is superseded by Kafka coordinator failure.

Piotrek


On 8 Jun 2018, at 10:05, Juho Autio <[hidden email]> wrote:

Hi,

We have a Flink stream job that uses Flink kafka consumer. Normally it commits consumer offsets to Kafka.

However this stream ended up in a state where it's otherwise working just fine, but it isn't committing offsets to Kafka any more. The job keeps writing correct aggregation results to the sink, though. At the time of writing this, the job has been running 14 hours without committing offsets.

Below is an extract from taskmanager.log. As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends, these are the last lines so far.

Could you help check if this is a know bug, possibly already fixed, or something new?

I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.

Cheers,
Juho

2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.10.2.1
2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : e89bffd6b2eff799
2018-06-06 10:01:33,560 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: <a href="tel:(214)%20748-3550" value="+12147483550" target="_blank">2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000.
2018-06-06 10:01:33,563 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: <a href="tel:(214)%20748-3550" value="+12147483550" target="_blank">2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000.
2018-06-07 22:08:28,773 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: <a href="tel:(214)%20748-3550" value="+12147483550" target="_blank">2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:28,776 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, metadata=''}, topic1-18=OffsetAndMetadata{offset=<a href="tel:(229)%20921-0444" value="+12299210444" target="_blank">12299210444, metadata=''}, topic3-0=OffsetAndMetadata{offset=<a href="tel:(506)%20427-7287" value="+15064277287" target="_blank">5064277287, metadata=''}, topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, topic1-10=OffsetAndMetadata{offset=<a href="tel:(229)%20974-2352" value="+12299742352" target="_blank">12299742352, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets.
2018-06-07 22:08:29,840 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: <a href="tel:(214)%20748-3550" value="+12147483550" target="_blank">2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:29,841 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=<a href="tel:(229)%20834-7875" value="+12298347875" target="_blank">12298347875, metadata=''}, topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''}, topic1-14=OffsetAndMetadata{offset=<a href="tel:(229)%20997-2108" value="+12299972108" target="_blank">12299972108, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets.