I'm trying to use Stateful Functions with Kafka as my ingress and egress. I'm using the Confluent fully-managed Kafka and I'm having a challenge adding my authentication details in the module.yaml file. Here is my current config details: version: "1.0" module: meta: type: remote spec: functions: - function: meta: kind: http type: example/greeter spec: endpoint: <https-endpoint> states: - seen_count maxNumBatchRequests: 500 timeout: 2min ingresses: - ingress: meta: type: statefun.kafka.io/routable-protobuf-ingress id: example/names spec: address: <confluent-bootstrap-server> consumerGroupId: statefun-consumer-group topics: - topic: names typeUrl: com.googleapis/example.GreetRequest targets: - example/greeter properties: - bootstrap.servers:<confluent-bootstrap-server> - security.protocol: SASL_SSL - sasl.mechanism: PLAIN - sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD"; - ssl.endpoint.identification.algorithm: https egresses: - egress: meta: type: statefun.kafka.io/generic-egress id: example/greets spec: address: <confluent-bootstrap-server> deliverySemantic: type: exactly-once transactionTimeoutMillis: 100000 properties: - bootstrap.servers: <confluent-bootstrap-server> - security.protocol: SASL_SSL - sasl.mechanisms: PLAIN - sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD"; - ssl.endpoint.identification.algorithm: https After running docker-compose with a master and worker containers I'm getting this error: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /tmp/jaas-2846080966990890307.conf The producer config logged : worker_1 | 2020-10-07 13:38:08,489 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: worker_1 | acks = 1 worker_1 | batch.size = 16384 worker_1 | bootstrap.servers = [https://---.asia-southeast1.gcp.confluent.cloud:9092] worker_1 | buffer.memory = 33554432 worker_1 | client.dns.lookup = default worker_1 | client.id = worker_1 | compression.type = none worker_1 | connections.max.idle.ms = 540000 worker_1 | delivery.timeout.ms = 120000 worker_1 | enable.idempotence = false worker_1 | interceptor.classes = [] worker_1 | key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer worker_1 | linger.ms = 0 worker_1 | max.block.ms = 60000 worker_1 | max.in.flight.requests.per.connection = 5 worker_1 | max.request.size = 1048576 worker_1 | metadata.max.age.ms = 300000 worker_1 | metric.reporters = [] worker_1 | metrics.num.samples = 2 worker_1 | metrics.recording.level = INFO worker_1 | metrics.sample.window.ms = 30000 worker_1 | partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner worker_1 | receive.buffer.bytes = 32768 worker_1 | reconnect.backoff.max.ms = 1000 worker_1 | reconnect.backoff.ms = 50 worker_1 | request.timeout.ms = 30000 worker_1 | retries = 2147483647 worker_1 | retry.backoff.ms = 100 worker_1 | sasl.client.callback.handler.class = null worker_1 | sasl.jaas.config = null worker_1 | sasl.kerberos.kinit.cmd = /usr/bin/kinit worker_1 | sasl.kerberos.min.time.before.relogin = 60000 worker_1 | sasl.kerberos.service.name = null worker_1 | sasl.kerberos.ticket.renew.jitter = 0.05 worker_1 | sasl.kerberos.ticket.renew.window.factor = 0.8 worker_1 | sasl.login.callback.handler.class = null worker_1 | sasl.login.class = null worker_1 | sasl.login.refresh.buffer.seconds = 300 worker_1 | sasl.login.refresh.min.period.seconds = 60 worker_1 | sasl.login.refresh.window.factor = 0.8 worker_1 | sasl.login.refresh.window.jitter = 0.05 worker_1 | sasl.mechanism = GSSAPI worker_1 | security.protocol = SASL_SSL worker_1 | send.buffer.bytes = 131072 worker_1 | ssl.cipher.suites = null worker_1 | ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] worker_1 | ssl.endpoint.identification.algorithm = https worker_1 | ssl.key.password = null worker_1 | ssl.keymanager.algorithm = SunX509 worker_1 | ssl.keystore.location = null worker_1 | ssl.keystore.password = null worker_1 | ssl.keystore.type = JKS worker_1 | ssl.protocol = TLS worker_1 | ssl.provider = null worker_1 | ssl.secure.random.implementation = null worker_1 | ssl.trustmanager.algorithm = PKIX worker_1 | ssl.truststore.location = null worker_1 | ssl.truststore.password = null worker_1 | ssl.truststore.type = JKS worker_1 | transaction.timeout.ms = 100000 worker_1 | transactional.id = null worker_1 | value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer worker_1 | Is there something that I'm missing? |
Hi Hezekiah, thanks for reporting this issue. I am pulling Gordon and Igal in who might be able to help you with this problem. Cheers, Till
Hi Hezekiah, I've confirmed that the Kafka properties set in the module specification file (module.yaml) are indeed correctly being parsed and used to construct the internal Kafka clients. StateFun / Flink does not alter or modify the properties. So, this should be something wrong with your property settings, and causing the Kafka client itself to not pick up the `sasl.jaas.config` property value. From the resolved producer config in the logs, it looks like your `sasl.jaas.config` is null, but all other properties are being picked up correctly. Please check your properties again, and make sure their keys are correct and values conform to the JAAS config formats. For starters, there's a typo in your `sasl.mechanism` config, you've mis-typed an extra 's'. I've verified that the following properties will work, with SASL JAAS config being picked up correctly: ``` egresses: - egress: meta: type: statefun.kafka.io/generic-egress id: example/greets spec: address: <confluent-bootstrap-server> deliverySemantic: type: exactly-once transactionTimeoutMillis: 100000 properties: - security.protocol: SASL_SSL - sasl.mechanism: PLAIN - sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD"; - ssl.endpoint.identification.algorithm: https ``` Cheers, Gordon
