Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

MAHESH KUMAR
Hi Team,

Kindly let me know if I am doing something wrong.

Kafka Version - kafka_2.11-0.10.1.1
Flink Version - flink-1.2.0
Using the latest Kafka Connector - FlinkKafkaConsumer010 - flink-connector-kafka-0.10_2.11

Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092  --list
console-consumer-19886
console-consumer-89637
$

It does not show the consumer group "test"

For a group that does not exist, the message is as follows: 

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test1 --describe
Consumer group `test1` does not exist.
$

For the "test" group the error message is as follows

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test --describe
Error while executing consumer group command Group test with protocol type '' is not a valid consumer group
java.lang.IllegalArgumentException: Group test with protocol type '' is not a valid consumer group
at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:308)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:89)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describe(ConsumerGroupCommand.scala:296)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
$


    if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
      throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group")

Code:

import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class KafkaFlinkOutput {
    private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
    private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
    private static final String CONSUMER_GROUP = "test";

    public KafkaFlinkOutput() {
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "test");
        kafkaProps.setProperty("auto.offset.reset", "latest");
        env.enableCheckpointing(1000L);
        FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("testIn1", new SimpleStringSchema(), kafkaProps);
        DataStreamSource consumerData = env.addSource(consumer);
        consumerData.print();
        System.out.println("Streaming Kafka in Flink");
        env.execute("Starting now!");
    }
}

Debug Logs that show that Kafka Connector does commit to Kafka:

2017-02-07 09:52:38,851 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values: 
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [localhost:9092]
        ssl.keystore.type = JKS
        enable.auto.commit = true
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id = 
        ssl.endpoint.identification.algorithm = null
        max.poll.records = <a href="tel:02147%20483%20647" value="+912147483647" target="_blank">2147483647
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = test
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = latest


2017-02-07 09:53:38,524 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Sending heartbeat to JobManager
2017-02-07 09:53:38,731 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 12 @ 1486486418731
2017-02-07 09:53:38,731 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver TriggerCheckpoint 12@1486486418731 for cfe59bc4aadc96e5a2235581460e9f3d.
2017-02-07 09:53:38,731 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Trigger for Source: Custom Source -> Sink: Unnamed (1/4) (cfe59bc4aadc96e5a2235581460e9f3d). on task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver TriggerCheckpoint 12@1486486418731 for 5611d20817d9a49680117c9ab000116c.
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Trigger for Source: Custom Source -> Sink: Unnamed (2/4) (5611d20817d9a49680117c9ab000116c). on task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver TriggerCheckpoint 12@1486486418731 for 58e5bc1040fc99f8d3f0a32c2bd524b6.
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Trigger for Source: Custom Source -> Sink: Unnamed (3/4) (58e5bc1040fc99f8d3f0a32c2bd524b6). on task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@26590268 for Async calls on Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver TriggerCheckpoint 12@1486486418731 for 95ad7256919d3296b37d17693cd0ba71.
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Trigger for Source: Custom Source -> Sink: Unnamed (4/4) (95ad7256919d3296b37d17693cd0ba71). on task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@2b3c1766 for Async calls on Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@2c4eb75a for Async calls on Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished synchronous checkpoints for checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (2/4) - finished synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot duration 0 ms
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished synchronous checkpoints for checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@b0b9bf0 for Async calls on Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (1/4) - finished asynchronous part of checkpoint 12. Asynchronous duration: 0 ms
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished synchronous checkpoints for checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished synchronous checkpoints for checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (3/4) - finished synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot duration 0 ms
2017-02-07 09:53:38,733 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (2/4) - finished asynchronous part of checkpoint 12. Asynchronous duration: 0 ms
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (3/4) - finished asynchronous part of checkpoint 12. Asynchronous duration: 0 ms
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (1/4) - finished synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot duration 0 ms
2017-02-07 09:53:38,733 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (4/4) - finished asynchronous part of checkpoint 12. Asynchronous duration: 0 ms
2017-02-07 09:53:38,733 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 12 from task 5611d20817d9a49680117c9ab000116c of job ef5998a3d1cbba24cb1790564f3037bf.
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (4/4) - finished synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot duration 0 ms
2017-02-07 09:53:38,734 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 12 from task 58e5bc1040fc99f8d3f0a32c2bd524b6 of job ef5998a3d1cbba24cb1790564f3037bf.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 12 from task 95ad7256919d3296b37d17693cd0ba71 of job ef5998a3d1cbba24cb1790564f3037bf.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 12 from task cfe59bc4aadc96e5a2235581460e9f3d of job ef5998a3d1cbba24cb1790564f3037bf.
2017-02-07 09:53:38,734 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 12 (4490 bytes in 3 ms).
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint state: TaskState(jobVertexID: cbc357ccb763df2852fee8c4fc7d55f2, parallelism: 4, sub task states: 4, total size (bytes): 4490)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver ConfirmCheckpoint 12@1486486418731 for cfe59bc4aadc96e5a2235581460e9f3d.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Confirmation for Source: Custom Source -> Sink: Unnamed (1/4) on task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver ConfirmCheckpoint 12@1486486418731 for 5611d20817d9a49680117c9ab000116c.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Confirmation for Source: Custom Source -> Sink: Unnamed (2/4) on task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Committing offsets to Kafka/ZooKeeper for checkpoint 12
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver ConfirmCheckpoint 12@1486486418731 for 58e5bc1040fc99f8d3f0a32c2bd524b6.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Confirmation for Source: Custom Source -> Sink: Unnamed (3/4) on task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Committing offsets to Kafka/ZooKeeper for checkpoint 12
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Committing offsets to Kafka/ZooKeeper for checkpoint 12
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Sending async offset commit request to Kafka broker
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver ConfirmCheckpoint 12@1486486418731 for 95ad7256919d3296b37d17693cd0ba71.
2017-02-07 09:53:38,735 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Confirmation for Source: Custom Source -> Sink: Unnamed (4/4) on task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,735 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer               - Committing offsets: {testIn1-1=OffsetAndMetadata{offset=17421, metadata=''}, testIn1-5=OffsetAndMetadata{offset=0, metadata=''}, testIn1-9=OffsetAndMetadata{offset=17493, metadata=''}} 
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Committing offsets to Kafka/ZooKeeper for checkpoint 12
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Sending async offset commit request to Kafka broker
2017-02-07 09:53:38,735 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer               - Committing offsets: {testIn1-0=OffsetAndMetadata{offset=34926, metadata=''}, testIn1-4=OffsetAndMetadata{offset=17325, metadata=''}, testIn1-8=OffsetAndMetadata{offset=17564, metadata=''}} 
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Sending async offset commit request to Kafka broker
2017-02-07 09:53:38,736 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer               - Committing offsets: {testIn1-3=OffsetAndMetadata{offset=52292, metadata=''}, testIn1-7=OffsetAndMetadata{offset=0, metadata=''}} 
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Sending async offset commit request to Kafka broker
2017-02-07 09:53:38,736 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer               - Committing offsets: {testIn1-2=OffsetAndMetadata{offset=0, metadata=''}, testIn1-6=OffsetAndMetadata{offset=17438, metadata=''}} 
2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 0 for partition testIn1-5
2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17493 for partition testIn1-9
2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17421 for partition testIn1-1
2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17325 for partition testIn1-4
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17564 for partition testIn1-8
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 34926 for partition testIn1-0
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 0 for partition testIn1-7
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 52292 for partition testIn1-3
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17438 for partition testIn1-6
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 0 for partition testIn1-2
2017-02-07 09:53:43,524 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Sending heartbeat to JobManager
2017-02-07 09:53:43,730 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 13 @ 1486486423730

--

Mahesh Kumar Ravindranathan                 
Data Streaming Engineer 
Oracle Marketing Cloud - Social Platform
Contact No:<a href="tel:%2B1%28720%29492-4445" value="+17204924445" style="color:rgb(17,85,204)" target="_blank">+1(720)492-4445

Reply | Threaded
Open this post in threaded view
|

Re: Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

rmetzger0
Hi Mahesh,

this is a known limitation of Apache Kafka: https://www.mail-archive.com/users@.../msg22595.html
You could implement a tool that is manually retrieving the latest offset for the group from the __offsets topic.

On Tue, Feb 7, 2017 at 6:24 PM, MAHESH KUMAR <[hidden email]> wrote:
Hi Team,

Kindly let me know if I am doing something wrong.

Kafka Version - kafka_2.11-0.10.1.1
Flink Version - flink-1.2.0
Using the latest Kafka Connector - FlinkKafkaConsumer010 - flink-connector-kafka-0.10_2.11

Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092  --list
console-consumer-19886
console-consumer-89637
$

It does not show the consumer group "test"

For a group that does not exist, the message is as follows: 

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test1 --describe
Consumer group `test1` does not exist.
$

For the "test" group the error message is as follows

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test --describe
Error while executing consumer group command Group test with protocol type '' is not a valid consumer group
java.lang.IllegalArgumentException: Group test with protocol type '' is not a valid consumer group
at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:308)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:89)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describe(ConsumerGroupCommand.scala:296)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
$


    if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
      throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group")

Code:

import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class KafkaFlinkOutput {
    private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
    private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
    private static final String CONSUMER_GROUP = "test";

    public KafkaFlinkOutput() {
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "test");
        kafkaProps.setProperty("auto.offset.reset", "latest");
        env.enableCheckpointing(1000L);
        FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("testIn1", new SimpleStringSchema(), kafkaProps);
        DataStreamSource consumerData = env.addSource(consumer);
        consumerData.print();
        System.out.println("Streaming Kafka in Flink");
        env.execute("Starting now!");
    }
}

Debug Logs that show that Kafka Connector does commit to Kafka:

2017-02-07 09:52:38,851 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values: 
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [localhost:9092]
        ssl.keystore.type = JKS
        enable.auto.commit = true
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id = 
        ssl.endpoint.identification.algorithm = null
        max.poll.records = <a href="tel:02147%20483%20647" value="+912147483647" target="_blank">2147483647
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = test
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = latest


2017-02-07 09:53:38,524 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Sending heartbeat to JobManager
2017-02-07 09:53:38,731 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 12 @ 1486486418731
2017-02-07 09:53:38,731 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver TriggerCheckpoint 12@1486486418731 for cfe59bc4aadc96e5a2235581460e9f3d.
2017-02-07 09:53:38,731 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Trigger for Source: Custom Source -> Sink: Unnamed (1/4) (cfe59bc4aadc96e5a2235581460e9f3d). on task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver TriggerCheckpoint 12@1486486418731 for 5611d20817d9a49680117c9ab000116c.
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Trigger for Source: Custom Source -> Sink: Unnamed (2/4) (5611d20817d9a49680117c9ab000116c). on task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver TriggerCheckpoint 12@1486486418731 for 58e5bc1040fc99f8d3f0a32c2bd524b6.
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Trigger for Source: Custom Source -> Sink: Unnamed (3/4) (58e5bc1040fc99f8d3f0a32c2bd524b6). on task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@26590268 for Async calls on Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver TriggerCheckpoint 12@1486486418731 for 95ad7256919d3296b37d17693cd0ba71.
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Trigger for Source: Custom Source -> Sink: Unnamed (4/4) (95ad7256919d3296b37d17693cd0ba71). on task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@2b3c1766 for Async calls on Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@2c4eb75a for Async calls on Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished synchronous checkpoints for checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (2/4) - finished synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot duration 0 ms
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished synchronous checkpoints for checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@b0b9bf0 for Async calls on Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (1/4) - finished asynchronous part of checkpoint 12. Asynchronous duration: 0 ms
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished synchronous checkpoints for checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished synchronous checkpoints for checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (3/4) - finished synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot duration 0 ms
2017-02-07 09:53:38,733 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (2/4) - finished asynchronous part of checkpoint 12. Asynchronous duration: 0 ms
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (3/4) - finished asynchronous part of checkpoint 12. Asynchronous duration: 0 ms
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (1/4) - finished synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot duration 0 ms
2017-02-07 09:53:38,733 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (4/4) - finished asynchronous part of checkpoint 12. Asynchronous duration: 0 ms
2017-02-07 09:53:38,733 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 12 from task 5611d20817d9a49680117c9ab000116c of job ef5998a3d1cbba24cb1790564f3037bf.
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (4/4) - finished synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot duration 0 ms
2017-02-07 09:53:38,734 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 12 from task 58e5bc1040fc99f8d3f0a32c2bd524b6 of job ef5998a3d1cbba24cb1790564f3037bf.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 12 from task 95ad7256919d3296b37d17693cd0ba71 of job ef5998a3d1cbba24cb1790564f3037bf.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 12 from task cfe59bc4aadc96e5a2235581460e9f3d of job ef5998a3d1cbba24cb1790564f3037bf.
2017-02-07 09:53:38,734 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 12 (4490 bytes in 3 ms).
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint state: TaskState(jobVertexID: cbc357ccb763df2852fee8c4fc7d55f2, parallelism: 4, sub task states: 4, total size (bytes): 4490)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver ConfirmCheckpoint 12@1486486418731 for cfe59bc4aadc96e5a2235581460e9f3d.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Confirmation for Source: Custom Source -> Sink: Unnamed (1/4) on task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver ConfirmCheckpoint 12@1486486418731 for 5611d20817d9a49680117c9ab000116c.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Confirmation for Source: Custom Source -> Sink: Unnamed (2/4) on task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Committing offsets to Kafka/ZooKeeper for checkpoint 12
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver ConfirmCheckpoint 12@1486486418731 for 58e5bc1040fc99f8d3f0a32c2bd524b6.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Confirmation for Source: Custom Source -> Sink: Unnamed (3/4) on task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Committing offsets to Kafka/ZooKeeper for checkpoint 12
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Committing offsets to Kafka/ZooKeeper for checkpoint 12
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Sending async offset commit request to Kafka broker
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver ConfirmCheckpoint 12@1486486418731 for 95ad7256919d3296b37d17693cd0ba71.
2017-02-07 09:53:38,735 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Confirmation for Source: Custom Source -> Sink: Unnamed (4/4) on task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,735 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer               - Committing offsets: {testIn1-1=OffsetAndMetadata{offset=17421, metadata=''}, testIn1-5=OffsetAndMetadata{offset=0, metadata=''}, testIn1-9=OffsetAndMetadata{offset=17493, metadata=''}} 
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Committing offsets to Kafka/ZooKeeper for checkpoint 12
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Sending async offset commit request to Kafka broker
2017-02-07 09:53:38,735 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer               - Committing offsets: {testIn1-0=OffsetAndMetadata{offset=34926, metadata=''}, testIn1-4=OffsetAndMetadata{offset=17325, metadata=''}, testIn1-8=OffsetAndMetadata{offset=17564, metadata=''}} 
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Sending async offset commit request to Kafka broker
2017-02-07 09:53:38,736 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer               - Committing offsets: {testIn1-3=OffsetAndMetadata{offset=52292, metadata=''}, testIn1-7=OffsetAndMetadata{offset=0, metadata=''}} 
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Sending async offset commit request to Kafka broker
2017-02-07 09:53:38,736 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer               - Committing offsets: {testIn1-2=OffsetAndMetadata{offset=0, metadata=''}, testIn1-6=OffsetAndMetadata{offset=17438, metadata=''}} 
2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 0 for partition testIn1-5
2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17493 for partition testIn1-9
2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17421 for partition testIn1-1
2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17325 for partition testIn1-4
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17564 for partition testIn1-8
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 34926 for partition testIn1-0
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 0 for partition testIn1-7
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 52292 for partition testIn1-3
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17438 for partition testIn1-6
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 0 for partition testIn1-2
2017-02-07 09:53:43,524 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Sending heartbeat to JobManager
2017-02-07 09:53:43,730 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 13 @ 1486486423730

--

Mahesh Kumar Ravindranathan                 
Data Streaming Engineer 
Oracle Marketing Cloud - Social Platform
Contact No:<a href="tel:%2B1%28720%29492-4445" value="+17204924445" style="color:rgb(17,85,204)" target="_blank">+1(720)492-4445


Reply | Threaded
Open this post in threaded view
|

Re: Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

MAHESH KUMAR
Thanks for the prompt reply

On Tue, Feb 7, 2017 at 10:38 AM, Robert Metzger <[hidden email]> wrote:
Hi Mahesh,

this is a known limitation of Apache Kafka: https://www.mail-archive.com/users@kafka.apache.org/msg22595.html
You could implement a tool that is manually retrieving the latest offset for the group from the __offsets topic.

On Tue, Feb 7, 2017 at 6:24 PM, MAHESH KUMAR <[hidden email]> wrote:
Hi Team,

Kindly let me know if I am doing something wrong.

Kafka Version - kafka_2.11-0.10.1.1
Flink Version - flink-1.2.0
Using the latest Kafka Connector - FlinkKafkaConsumer010 - flink-connector-kafka-0.10_2.11

Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092  --list
console-consumer-19886
console-consumer-89637
$

It does not show the consumer group "test"

For a group that does not exist, the message is as follows: 

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test1 --describe
Consumer group `test1` does not exist.
$

For the "test" group the error message is as follows

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test --describe
Error while executing consumer group command Group test with protocol type '' is not a valid consumer group
java.lang.IllegalArgumentException: Group test with protocol type '' is not a valid consumer group
at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:308)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:89)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describe(ConsumerGroupCommand.scala:296)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
$


    if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
      throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group")

Code:

import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class KafkaFlinkOutput {
    private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
    private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
    private static final String CONSUMER_GROUP = "test";

    public KafkaFlinkOutput() {
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "test");
        kafkaProps.setProperty("auto.offset.reset", "latest");
        env.enableCheckpointing(1000L);
        FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("testIn1", new SimpleStringSchema(), kafkaProps);
        DataStreamSource consumerData = env.addSource(consumer);
        consumerData.print();
        System.out.println("Streaming Kafka in Flink");
        env.execute("Starting now!");
    }
}

Debug Logs that show that Kafka Connector does commit to Kafka:

2017-02-07 09:52:38,851 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values: 
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [localhost:9092]
        ssl.keystore.type = JKS
        enable.auto.commit = true
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id = 
        ssl.endpoint.identification.algorithm = null
        max.poll.records = <a href="tel:02147%20483%20647" value="+912147483647" target="_blank">2147483647
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = test
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = latest


2017-02-07 09:53:38,524 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Sending heartbeat to JobManager
2017-02-07 09:53:38,731 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 12 @ 1486486418731
2017-02-07 09:53:38,731 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver TriggerCheckpoint 12@1486486418731 for cfe59bc4aadc96e5a2235581460e9f3d.
2017-02-07 09:53:38,731 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Trigger for Source: Custom Source -> Sink: Unnamed (1/4) (cfe59bc4aadc96e5a2235581460e9f3d). on task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver TriggerCheckpoint 12@1486486418731 for 5611d20817d9a49680117c9ab000116c.
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Trigger for Source: Custom Source -> Sink: Unnamed (2/4) (5611d20817d9a49680117c9ab000116c). on task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver TriggerCheckpoint 12@1486486418731 for 58e5bc1040fc99f8d3f0a32c2bd524b6.
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Trigger for Source: Custom Source -> Sink: Unnamed (3/4) (58e5bc1040fc99f8d3f0a32c2bd524b6). on task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@26590268 for Async calls on Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver TriggerCheckpoint 12@1486486418731 for 95ad7256919d3296b37d17693cd0ba71.
2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Trigger for Source: Custom Source -> Sink: Unnamed (4/4) (95ad7256919d3296b37d17693cd0ba71). on task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@2b3c1766 for Async calls on Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@2c4eb75a for Async calls on Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished synchronous checkpoints for checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (2/4) - finished synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot duration 0 ms
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished synchronous checkpoints for checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,732 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@b0b9bf0 for Async calls on Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (1/4) - finished asynchronous part of checkpoint 12. Asynchronous duration: 0 ms
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished synchronous checkpoints for checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished synchronous checkpoints for checkpoint 12 on task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (3/4) - finished synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot duration 0 ms
2017-02-07 09:53:38,733 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (2/4) - finished asynchronous part of checkpoint 12. Asynchronous duration: 0 ms
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (3/4) - finished asynchronous part of checkpoint 12. Asynchronous duration: 0 ms
2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (1/4) - finished synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot duration 0 ms
2017-02-07 09:53:38,733 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (4/4) - finished asynchronous part of checkpoint 12. Asynchronous duration: 0 ms
2017-02-07 09:53:38,733 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 12 from task 5611d20817d9a49680117c9ab000116c of job ef5998a3d1cbba24cb1790564f3037bf.
2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Source: Custom Source -> Sink: Unnamed (4/4) - finished synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot duration 0 ms
2017-02-07 09:53:38,734 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 12 from task 58e5bc1040fc99f8d3f0a32c2bd524b6 of job ef5998a3d1cbba24cb1790564f3037bf.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 12 from task 95ad7256919d3296b37d17693cd0ba71 of job ef5998a3d1cbba24cb1790564f3037bf.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 12 from task cfe59bc4aadc96e5a2235581460e9f3d of job ef5998a3d1cbba24cb1790564f3037bf.
2017-02-07 09:53:38,734 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 12 (4490 bytes in 3 ms).
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint state: TaskState(jobVertexID: cbc357ccb763df2852fee8c4fc7d55f2, parallelism: 4, sub task states: 4, total size (bytes): 4490)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver ConfirmCheckpoint 12@1486486418731 for cfe59bc4aadc96e5a2235581460e9f3d.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Confirmation for Source: Custom Source -> Sink: Unnamed (1/4) on task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver ConfirmCheckpoint 12@1486486418731 for 5611d20817d9a49680117c9ab000116c.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task Source: Custom Source -> Sink: Unnamed (1/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Confirmation for Source: Custom Source -> Sink: Unnamed (2/4) on task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Committing offsets to Kafka/ZooKeeper for checkpoint 12
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver ConfirmCheckpoint 12@1486486418731 for 58e5bc1040fc99f8d3f0a32c2bd524b6.
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Confirmation for Source: Custom Source -> Sink: Unnamed (3/4) on task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task Source: Custom Source -> Sink: Unnamed (2/4)
2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Committing offsets to Kafka/ZooKeeper for checkpoint 12
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task Source: Custom Source -> Sink: Unnamed (3/4)
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Committing offsets to Kafka/ZooKeeper for checkpoint 12
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Sending async offset commit request to Kafka broker
2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Receiver ConfirmCheckpoint 12@1486486418731 for 95ad7256919d3296b37d17693cd0ba71.
2017-02-07 09:53:38,735 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Invoking async call Checkpoint Confirmation for Source: Custom Source -> Sink: Unnamed (4/4) on task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,735 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer               - Committing offsets: {testIn1-1=OffsetAndMetadata{offset=17421, metadata=''}, testIn1-5=OffsetAndMetadata{offset=0, metadata=''}, testIn1-9=OffsetAndMetadata{offset=17493, metadata=''}} 
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task Source: Custom Source -> Sink: Unnamed (4/4)
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Committing offsets to Kafka/ZooKeeper for checkpoint 12
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Sending async offset commit request to Kafka broker
2017-02-07 09:53:38,735 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer               - Committing offsets: {testIn1-0=OffsetAndMetadata{offset=34926, metadata=''}, testIn1-4=OffsetAndMetadata{offset=17325, metadata=''}, testIn1-8=OffsetAndMetadata{offset=17564, metadata=''}} 
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Sending async offset commit request to Kafka broker
2017-02-07 09:53:38,736 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer               - Committing offsets: {testIn1-3=OffsetAndMetadata{offset=52292, metadata=''}, testIn1-7=OffsetAndMetadata{offset=0, metadata=''}} 
2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Sending async offset commit request to Kafka broker
2017-02-07 09:53:38,736 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer               - Committing offsets: {testIn1-2=OffsetAndMetadata{offset=0, metadata=''}, testIn1-6=OffsetAndMetadata{offset=17438, metadata=''}} 
2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 0 for partition testIn1-5
2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17493 for partition testIn1-9
2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17421 for partition testIn1-1
2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17325 for partition testIn1-4
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17564 for partition testIn1-8
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 34926 for partition testIn1-0
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 0 for partition testIn1-7
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 52292 for partition testIn1-3
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 17438 for partition testIn1-6
2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group test committed offset 0 for partition testIn1-2
2017-02-07 09:53:43,524 DEBUG org.apache.flink.runtime.taskmanager.TaskManager              - Sending heartbeat to JobManager
2017-02-07 09:53:43,730 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 13 @ 1486486423730

--

Mahesh Kumar Ravindranathan                 
Data Streaming Engineer 
Oracle Marketing Cloud - Social Platform
Contact No:<a href="tel:%2B1%28720%29492-4445" value="+17204924445" style="color:rgb(17,85,204)" target="_blank">+1(720)492-4445





--

Mahesh Kumar Ravindranathan                 
Data Streaming Engineer 
Oracle Marketing Cloud - Social Platform
Contact No:<a href="tel:%2B1%28720%29492-4445" value="+17204924445" style="color:rgb(17,85,204)" target="_blank">+1(720)492-4445