Hi Team, Kindly let me know if I am doing something wrong. Kafka Version - kafka_2.11-0.10.1.1 Flink Version - flink-1.2.0 Using the latest Kafka Connector - FlinkKafkaConsumer010 - flink-connector-kafka-0.10_2. Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector $ /work/kafka_2.11-0.10.1.1/bin/ console-consumer-19886 console-consumer-89637 $ It does not show the consumer group "test" For a group that does not exist, the message is as follows: $ /work/kafka_2.11-0.10.1.1/bin/ Consumer group `test1` does not exist. $ For the "test" group the error message is as follows $ /work/kafka_2.11-0.10.1.1/bin/ Error while executing consumer group command Group test with protocol type '' is not a valid consumer group java.lang. at kafka.admin.AdminClient. at kafka.admin. at kafka.admin. at kafka.admin. at kafka.admin. at kafka.admin. $ The error is from the AdminClient.scala (https://github.com/apache/ if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_ throw new IllegalArgumentException(s" Code: import java.util.Properties; import org.apache.flink.streaming. import org.apache.flink.streaming. import org.apache.flink.streaming. import org.apache.flink.streaming. public class KafkaFlinkOutput { private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181"; private static final String LOCAL_KAFKA_BROKER = "localhost:9092"; private static final String CONSUMER_GROUP = "test"; public KafkaFlinkOutput() { } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment. Properties kafkaProps = new Properties(); kafkaProps.setProperty(" kafkaProps.setProperty(" kafkaProps.setProperty("group. kafkaProps.setProperty("auto. env.enableCheckpointing(1000L) FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010(" DataStreamSource consumerData = env.addSource(consumer); consumerData.print(); System.out.println("Streaming Kafka in Flink"); env.execute("Starting now!"); } } Debug Logs that show that Kafka Connector does commit to Kafka: 2017-02-07 09:52:38,851 INFO org.apache.kafka.clients. metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients. reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew. max.partition.fetch.bytes = 1048576 bootstrap.servers = [localhost:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null ssl.endpoint.identification. max.poll.records = <a href="tel:02147%20483%20647" value="+912147483647" target="_blank">2147483647 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common. group.id = test retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew. ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before. connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common. ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest 2017-02-07 09:53:38,524 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,731 INFO org.apache.flink.runtime. 2017-02-07 09:53:38,731 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,731 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,732 INFO org.apache.flink.core.fs. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,732 INFO org.apache.flink.core.fs. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,732 INFO org.apache.flink.core.fs. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,732 INFO org.apache.flink.core.fs. 2017-02-07 09:53:38,732 INFO org.apache.flink.core.fs. 2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,733 INFO org.apache.flink.core.fs. 2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,732 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,733 INFO org.apache.flink.core.fs. 2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,733 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,733 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,734 INFO org.apache.flink.core.fs. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,734 INFO org.apache.flink.runtime. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,735 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:38,735 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,735 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,736 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming. 2017-02-07 09:53:38,736 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients. 2017-02-07 09:53:43,524 DEBUG org.apache.flink.runtime. 2017-02-07 09:53:43,730 INFO org.apache.flink.runtime. Mahesh Kumar Ravindranathan Data Streaming Engineer Oracle Marketing Cloud - Social Platform Contact No:<a href="tel:%2B1%28720%29492-4445" value="+17204924445" style="color:rgb(17,85,204)" target="_blank">+1(720)492-4445 |
Hi Mahesh, this is a known limitation of Apache Kafka: https://www.mail-archive.com/users@.../msg22595.html You could implement a tool that is manually retrieving the latest offset for the group from the __offsets topic. On Tue, Feb 7, 2017 at 6:24 PM, MAHESH KUMAR <[hidden email]> wrote:
|
Thanks for the prompt reply On Tue, Feb 7, 2017 at 10:38 AM, Robert Metzger <[hidden email]> wrote:
Mahesh Kumar Ravindranathan Data Streaming Engineer Oracle Marketing Cloud - Social Platform Contact No:<a href="tel:%2B1%28720%29492-4445" value="+17204924445" style="color:rgb(17,85,204)" target="_blank">+1(720)492-4445 |
Free forum by Nabble | Edit this page |