Flink Kafka connector08 not updating the offsets with the zookeeper

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Kafka connector08 not updating the offsets with the zookeeper

Anchit Jatana
Hi All,

I'm using Flink Kafka connector08. I need to check/monitor the offsets of the my flink application's kafka consumer.

When running this:

bin/kafka-consumer-groups.sh --zookeeper <zookeeper-url:port> --describe --group <group-id>

I get the message: No topic available for consumer group provided. Why is the consumer not updating the offsets with the zookeeper ? 

PS: I have enabled checkpointing. Is there any configuration that I'm missing or is this some sort of a bug? 

Using Flink version 1.1.2

Thank you

Regards,
Anchit
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka connector08 not updating the offsets with the zookeeper

rmetzger0
Hi Anchit,

Can you re-run your job with the debug level for Flink set to DEBUG?
Then, you should see the following log message every time the offset is committed of Zookeeper:

"Committing offsets to Kafka/ZooKeeper for checkpoint"

Alternatively, can you check whether the offsets are available in Zookeeper once the first checkpoint completed?
They should be located in 

/consumers/<group.id>/offsets/<topic>/<partition>

In Flink 1.2-SNAPSHOT, Flink is exposing the current offset as a metric for all kafka connector versions.

Regards,
Robert


On Wed, Oct 12, 2016 at 2:35 AM, Anchit Jatana <[hidden email]> wrote:
Hi All,

I'm using Flink Kafka connector08. I need to check/monitor the offsets of the my flink application's kafka consumer.

When running this:

bin/kafka-consumer-groups.sh --zookeeper <zookeeper-url:port> --describe --group <group-id>

I get the message: No topic available for consumer group provided. Why is the consumer not updating the offsets with the zookeeper ? 

PS: I have enabled checkpointing. Is there any configuration that I'm missing or is this some sort of a bug? 

Using Flink version 1.1.2

Thank you

Regards,
Anchit

Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka connector08 not updating the offsets with the zookeeper

Anchit Jatana
Hi Robert,

Thanks for your response. I just figured out what the issue is.

The reason why- bin/kafka-consumer-groups.sh --zookeeper <zookeeper-url:port> --describe --group <group-id> is not showing any result is because of the absence of the

/consumers/<group-id>/owners/<topic> in the zookeeper.

The flink application is creating and updating /consumers/<group-id>/offsets/<topic>/<partition> but not creating "owners" Znode

If I manually create the Znode using the following:

create /consumers/<group-id>/owners “firstchildren”

create /consumers/<group-id>/owners/<topic> null

It works fine, bin/kafka-consumer-groups.sh --zookeeper <zookeeper-url:port> --describe --group <group-id> starts pulling offset results for me.

I think this needs to be corrected in the application: to check and create "/consumers/<group-id>/owners/<topic>" if it does not exist.

Regards,
Anchit
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka connector08 not updating the offsets with the zookeeper

rmetzger0
Okay, I see.
According to this document, we need to set a consumer id for each groupid and topic: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

I created a JIRA for fixing this issue: https://issues.apache.org/jira/browse/FLINK-4822



On Wed, Oct 12, 2016 at 8:08 PM, Anchit Jatana <[hidden email]> wrote:
Hi Robert,

Thanks for your response. I just figured out what the issue is.

The reason why- bin/kafka-consumer-groups.sh --zookeeper
<zookeeper-url:port> --describe --group <group-id> is not showing any result
is because of the absence of the

/consumers/<group-id>/owners/<topic> in the zookeeper.

The flink application is creating and updating
/consumers/<group-id>/offsets/<topic>/<partition> but not creating "owners"
Znode

If I manually create the Znode using the following:

create /consumers/<group-id>/owners “firstchildren”

create /consumers/<group-id>/owners/<topic> null

It works fine, bin/kafka-consumer-groups.sh --zookeeper <zookeeper-url:port>
--describe --group <group-id> starts pulling offset results for me.

I think this needs to be corrected in the application: to check and create
"/consumers/<group-id>/owners/<topic>" if it does not exist.

Regards,
Anchit



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-tp9469p9498.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.