Flink and kerberos

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink and kerberos

Vishwas Siravara
Hi guys,
I am using kerberos for my kafka source. I pass the jaas config and krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.0000-Linux-x86_64-64b-r234867/lib/ -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf

When I look at debug logs I see that the consumer was created with the following properties. 
2019-08-29 06:49:18,298 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [sl73oprdbd018.visa.com:9092]
        check.crcs = true
        client.id = consumer-2
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1

        group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = SASL_PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

I can also see that the kerberos login is working fine. Here is the log for it: 

2019-08-29 06:49:18,312 INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. 2019-08-29 06:49:18,313 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT refresh thread started. 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT valid starting at: Thu Aug 29 06:49:18 GMT 2019 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT expires: Thu Aug 29 16:49:18 GMT 2019 2019-08-29 06:49:18,315 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT refresh sleeping until: Thu Aug 29 15:00:10 GMT 2019 2019-08-29 06:49:18,316 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421

I then see this log : 
INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633 rack: null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)


The problem is I do not see any error log but there is no data being processed by the consmer and it has been a nightmare to debug. 

Thanks for all the help . 

Thanks,Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink and kerberos

Vishwas Siravara
I see this log as well , but I can't see any messages . I know for a fact that the topic I am subscribed to has messages as I checked with a simple java consumer with a different group. 


 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]

On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara <[hidden email]> wrote:
Hi guys,
I am using kerberos for my kafka source. I pass the jaas config and krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.0000-Linux-x86_64-64b-r234867/lib/ -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf

When I look at debug logs I see that the consumer was created with the following properties. 
2019-08-29 06:49:18,298 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [sl73oprdbd018.visa.com:9092]
        check.crcs = true
        client.id = consumer-2
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1

        group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = SASL_PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

I can also see that the kerberos login is working fine. Here is the log for it: 

2019-08-29 06:49:18,312 INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. 2019-08-29 06:49:18,313 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT refresh thread started. 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT valid starting at: Thu Aug 29 06:49:18 GMT 2019 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT expires: Thu Aug 29 16:49:18 GMT 2019 2019-08-29 06:49:18,315 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT refresh sleeping until: Thu Aug 29 15:00:10 GMT 2019 2019-08-29 06:49:18,316 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421

I then see this log : 
INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633 rack: null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)


The problem is I do not see any error log but there is no data being processed by the consmer and it has been a nightmare to debug. 

Thanks for all the help . 

Thanks,Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink and kerberos

aldu29
Hello Vishwas,

You can use a keytab if you prefer. You generate a keytab for your user and then you can reference it in the Flink configuration.
Then this keytab will be handled by Flink in a secure way and TGT will be created based on this keytab.
However, that seems to be working.
Did you check Kafka logs on the broker side ?
Or did you check consumer offsets with Kafka tools in order to validate consumers are registered onto the different partitions of your topic ?
You could try to switch to a different groupid for your consumer group in order to force parallel consumption.

Le jeu. 29 août 2019 à 09:57, Vishwas Siravara <[hidden email]> a écrit :
I see this log as well , but I can't see any messages . I know for a fact that the topic I am subscribed to has messages as I checked with a simple java consumer with a different group. 


 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]

On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara <[hidden email]> wrote:
Hi guys,
I am using kerberos for my kafka source. I pass the jaas config and krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.0000-Linux-x86_64-64b-r234867/lib/ -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf

When I look at debug logs I see that the consumer was created with the following properties. 
2019-08-29 06:49:18,298 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [sl73oprdbd018.visa.com:9092]
        check.crcs = true
        client.id = consumer-2
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1

        group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = SASL_PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

I can also see that the kerberos login is working fine. Here is the log for it: 

2019-08-29 06:49:18,312 INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. 2019-08-29 06:49:18,313 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT refresh thread started. 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT valid starting at: Thu Aug 29 06:49:18 GMT 2019 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT expires: Thu Aug 29 16:49:18 GMT 2019 2019-08-29 06:49:18,315 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT refresh sleeping until: Thu Aug 29 15:00:10 GMT 2019 2019-08-29 06:49:18,316 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421

I then see this log : 
INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633 rack: null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)


The problem is I do not see any error log but there is no data being processed by the consmer and it has been a nightmare to debug. 

Thanks for all the help . 

Thanks,Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink and kerberos

Vishwas Siravara
Hey David ,
My consumers are registered , here is the debug log. The problem is the broker does not belong to me , so I can’t see what is going on there . But this is a new consumer group , so there is no state yet . 

 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]
On Thu, Aug 29, 2019 at 11:39 AM David Morin <[hidden email]> wrote:
Hello Vishwas,

You can use a keytab if you prefer. You generate a keytab for your user and then you can reference it in the Flink configuration.
Then this keytab will be handled by Flink in a secure way and TGT will be created based on this keytab.
However, that seems to be working.
Did you check Kafka logs on the broker side ?
Or did you check consumer offsets with Kafka tools in order to validate consumers are registered onto the different partitions of your topic ?
You could try to switch to a different groupid for your consumer group in order to force parallel consumption.

Le jeu. 29 août 2019 à 09:57, Vishwas Siravara <[hidden email]> a écrit :
I see this log as well , but I can't see any messages . I know for a fact that the topic I am subscribed to has messages as I checked with a simple java consumer with a different group. 


 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]

On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara <[hidden email]> wrote:
Hi guys,
I am using kerberos for my kafka source. I pass the jaas config and krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.0000-Linux-x86_64-64b-r234867/lib/ -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf

When I look at debug logs I see that the consumer was created with the following properties. 
2019-08-29 06:49:18,298 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [sl73oprdbd018.visa.com:9092]
        check.crcs = true
        client.id = consumer-2
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1

        group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = SASL_PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

I can also see that the kerberos login is working fine. Here is the log for it: 

2019-08-29 06:49:18,312 INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. 2019-08-29 06:49:18,313 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT refresh thread started. 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT valid starting at: Thu Aug 29 06:49:18 GMT 2019 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT expires: Thu Aug 29 16:49:18 GMT 2019 2019-08-29 06:49:18,315 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT refresh sleeping until: Thu Aug 29 15:00:10 GMT 2019 2019-08-29 06:49:18,316 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421

I then see this log : 
INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633 rack: null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)


The problem is I do not see any error log but there is no data being processed by the consmer and it has been a nightmare to debug. 

Thanks for all the help . 

Thanks,Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink and kerberos

aldu29
Vishwas,

A config that works on my Kerberized cluster (Flink on Yarn).
I hope this will help you.

Flink conf: 
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/myuser/myuser.keytab
security.kerberos.login.principal: myuser@XXXX
security.kerberos.login.contexts: Client

Properties related to security passed as argument of the FlinkKafkaConsumerXX constructor:
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"myuser\" password=\"XXXX\";"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL

Le jeu. 29 août 2019 à 18:20, Vishwas Siravara <[hidden email]> a écrit :
Hey David ,
My consumers are registered , here is the debug log. The problem is the broker does not belong to me , so I can’t see what is going on there . But this is a new consumer group , so there is no state yet . 

 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]
On Thu, Aug 29, 2019 at 11:39 AM David Morin <[hidden email]> wrote:
Hello Vishwas,

You can use a keytab if you prefer. You generate a keytab for your user and then you can reference it in the Flink configuration.
Then this keytab will be handled by Flink in a secure way and TGT will be created based on this keytab.
However, that seems to be working.
Did you check Kafka logs on the broker side ?
Or did you check consumer offsets with Kafka tools in order to validate consumers are registered onto the different partitions of your topic ?
You could try to switch to a different groupid for your consumer group in order to force parallel consumption.

Le jeu. 29 août 2019 à 09:57, Vishwas Siravara <[hidden email]> a écrit :
I see this log as well , but I can't see any messages . I know for a fact that the topic I am subscribed to has messages as I checked with a simple java consumer with a different group. 


 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]

On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara <[hidden email]> wrote:
Hi guys,
I am using kerberos for my kafka source. I pass the jaas config and krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.0000-Linux-x86_64-64b-r234867/lib/ -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf

When I look at debug logs I see that the consumer was created with the following properties. 
2019-08-29 06:49:18,298 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [sl73oprdbd018.visa.com:9092]
        check.crcs = true
        client.id = consumer-2
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1

        group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = SASL_PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

I can also see that the kerberos login is working fine. Here is the log for it: 

2019-08-29 06:49:18,312 INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. 2019-08-29 06:49:18,313 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT refresh thread started. 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT valid starting at: Thu Aug 29 06:49:18 GMT 2019 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT expires: Thu Aug 29 16:49:18 GMT 2019 2019-08-29 06:49:18,315 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT refresh sleeping until: Thu Aug 29 15:00:10 GMT 2019 2019-08-29 06:49:18,316 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421

I then see this log : 
INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633 rack: null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)


The problem is I do not see any error log but there is no data being processed by the consmer and it has been a nightmare to debug. 

Thanks for all the help . 

Thanks,Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink and kerberos

Vishwas Siravara
Thanks, I'll check it out.

On Thu, Aug 29, 2019 at 1:08 PM David Morin <[hidden email]> wrote:
Vishwas,

A config that works on my Kerberized cluster (Flink on Yarn).
I hope this will help you.

Flink conf: 
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/myuser/myuser.keytab
security.kerberos.login.principal: myuser@XXXX
security.kerberos.login.contexts: Client

Properties related to security passed as argument of the FlinkKafkaConsumerXX constructor:
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"myuser\" password=\"XXXX\";"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL

Le jeu. 29 août 2019 à 18:20, Vishwas Siravara <[hidden email]> a écrit :
Hey David ,
My consumers are registered , here is the debug log. The problem is the broker does not belong to me , so I can’t see what is going on there . But this is a new consumer group , so there is no state yet . 

 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]
On Thu, Aug 29, 2019 at 11:39 AM David Morin <[hidden email]> wrote:
Hello Vishwas,

You can use a keytab if you prefer. You generate a keytab for your user and then you can reference it in the Flink configuration.
Then this keytab will be handled by Flink in a secure way and TGT will be created based on this keytab.
However, that seems to be working.
Did you check Kafka logs on the broker side ?
Or did you check consumer offsets with Kafka tools in order to validate consumers are registered onto the different partitions of your topic ?
You could try to switch to a different groupid for your consumer group in order to force parallel consumption.

Le jeu. 29 août 2019 à 09:57, Vishwas Siravara <[hidden email]> a écrit :
I see this log as well , but I can't see any messages . I know for a fact that the topic I am subscribed to has messages as I checked with a simple java consumer with a different group. 


 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]

On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara <[hidden email]> wrote:
Hi guys,
I am using kerberos for my kafka source. I pass the jaas config and krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.0000-Linux-x86_64-64b-r234867/lib/ -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf

When I look at debug logs I see that the consumer was created with the following properties. 
2019-08-29 06:49:18,298 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [sl73oprdbd018.visa.com:9092]
        check.crcs = true
        client.id = consumer-2
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1

        group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = SASL_PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

I can also see that the kerberos login is working fine. Here is the log for it: 

2019-08-29 06:49:18,312 INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. 2019-08-29 06:49:18,313 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT refresh thread started. 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT valid starting at: Thu Aug 29 06:49:18 GMT 2019 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT expires: Thu Aug 29 16:49:18 GMT 2019 2019-08-29 06:49:18,315 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/[hidden email]]: TGT refresh sleeping until: Thu Aug 29 15:00:10 GMT 2019 2019-08-29 06:49:18,316 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421

I then see this log : 
INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633 rack: null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)


The problem is I do not see any error log but there is no data being processed by the consmer and it has been a nightmare to debug. 

Thanks for all the help . 

Thanks,Vishwas