Hi, This is a repost with modified subject per Sri Tummala's suggestion. I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried to put keystore.jks location under /usr/lib/flink/... like: export SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks Notice that this is on EMR master(master) node. Both JM and TMs are on EMR core(slave) nodes. Here is the code snippet: val stmt = s""" |create table ${table.name} (${schema}, ${watermark}) |with( |'connector' = 'kafka', |'topic' = '${table.topic}', |'scan.startup.mode'= '${table.scanStartUpMode}', |'properties.zookeeper.connect'='xxx', |'properties.bootstrap.servers'='xxx', |'properties.ssl.keystore.location'='/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks', |'properties.ssl.keystore.password'='xxx', |'properties.ssl.key.password'='xxx', |'properties.security.protocol'='SSL', |'properties.ssl.keystore.type'='JKS', |'properties.ssl.truststore.type'='JKS', |'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1', |'properties.group.id' = '${table.name}_group_id', |'format' = 'json', |'json.ignore-parse-errors' = 'true' |) """.stripMargin tEnv.executeSql(stmt) However, I got exception: Caused by: java.nio.file.NoSuchFileException: /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks even though the file is there [hadoop@ip-10-200-41-39 flink]$ ll /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks -rw-r--r-- 1 root root 5565 Nov 17 22:24 /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks Things i tried: 1. the keystore.jks file itself works since I can use console-consumer to read kafka topics on EMR master. 2. set the location to be s3://my-bucket/keystore.jks, not working What value should I set the keystore location to? Thanks! Fanbin Also attached the full exception log: 2020-11-17 09:35:49 org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146) at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741) ... 15 more Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163) at org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104) at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69) ... 19 more Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292) at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144) ... 22 more Caused by: java.nio.file.NoSuchFileException: /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) at java.nio.file.Files.newByteChannel(Files.java:361) at java.nio.file.Files.newByteChannel(Files.java:407) at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384) at java.nio.file.Files.newInputStream(Files.java:152) at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285) |
i have to put the keystore file to the nodes. On Wed, Nov 18, 2020 at 4:29 PM Fanbin Bu <[hidden email]> wrote:
|
Glad to hear that you worked it out. Indeed, the path has to be accessible by the worker nodes. A common solution is also to put it on some DFS like HDFS and reference that. Then you only need to update one file if the key changes. On Thu, Nov 19, 2020 at 2:14 AM Fanbin Bu <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |