How to set keystore.jks location on EMR when reading Kafka topics via SSL

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to set keystore.jks location on EMR when reading Kafka topics via SSL

Fanbin Bu
Hi,

This is a repost with modified subject per Sri Tummala's suggestion.

I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried to put keystore.jks location under /usr/lib/flink/... like:

export SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Notice that this is on EMR master(master) node. Both JM and TMs are on EMR core(slave) nodes.

Here is the code snippet:

val stmt = s"""
  |create table ${table.name} (${schema}, ${watermark})
  |with(
  |'connector' = 'kafka',
  |'topic' = '${table.topic}',
  |'scan.startup.mode'= '${table.scanStartUpMode}',
  |'properties.zookeeper.connect'='xxx',
  |'properties.bootstrap.servers'='xxx',
  |'properties.ssl.keystore.location'='/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks',
  |'properties.ssl.keystore.password'='xxx',
  |'properties.ssl.key.password'='xxx',
  |'properties.security.protocol'='SSL',
  |'properties.ssl.keystore.type'='JKS',
  |'properties.ssl.truststore.type'='JKS',
  |'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1',
  |'properties.group.id' = '${table.name}_group_id',
  |'format' = 'json',
  |'json.ignore-parse-errors' = 'true'
  |)
""".stripMargin

tEnv.executeSql(stmt)


However, I got exception: Caused by: java.nio.file.NoSuchFileException: /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
even though the file is there
[hadoop@ip-10-200-41-39 flink]$ ll /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
-rw-r--r-- 1 root root 5565 Nov 17 22:24 /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Things i tried:
1. the keystore.jks file itself works since I can use console-consumer to read kafka topics on EMR master.
2. set the location to be s3://my-bucket/keystore.jks, not working

What value should I set the keystore location to?
Thanks!
Fanbin

Also attached the full exception log:

2020-11-17 09:35:49
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
... 15 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
at org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144)
... 22 more
Caused by: java.nio.file.NoSuchFileException: /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
Reply | Threaded
Open this post in threaded view
|

Re: How to set keystore.jks location on EMR when reading Kafka topics via SSL

Fanbin Bu
i have to put the keystore file to the nodes.

On Wed, Nov 18, 2020 at 4:29 PM Fanbin Bu <[hidden email]> wrote:
Hi,

This is a repost with modified subject per Sri Tummala's suggestion.

I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried to put keystore.jks location under /usr/lib/flink/... like:

export SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Notice that this is on EMR master(master) node. Both JM and TMs are on EMR core(slave) nodes.

Here is the code snippet:

val stmt = s"""
  |create table ${table.name} (${schema}, ${watermark})
  |with(
  |'connector' = 'kafka',
  |'topic' = '${table.topic}',
  |'scan.startup.mode'= '${table.scanStartUpMode}',
  |'properties.zookeeper.connect'='xxx',
  |'properties.bootstrap.servers'='xxx',
  |'properties.ssl.keystore.location'='/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks',
  |'properties.ssl.keystore.password'='xxx',
  |'properties.ssl.key.password'='xxx',
  |'properties.security.protocol'='SSL',
  |'properties.ssl.keystore.type'='JKS',
  |'properties.ssl.truststore.type'='JKS',
  |'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1',
  |'properties.group.id' = '${table.name}_group_id',
  |'format' = 'json',
  |'json.ignore-parse-errors' = 'true'
  |)
""".stripMargin

tEnv.executeSql(stmt)


However, I got exception: Caused by: java.nio.file.NoSuchFileException: /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
even though the file is there
[hadoop@ip-10-200-41-39 flink]$ ll /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
-rw-r--r-- 1 root root 5565 Nov 17 22:24 /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Things i tried:
1. the keystore.jks file itself works since I can use console-consumer to read kafka topics on EMR master.
2. set the location to be s3://my-bucket/keystore.jks, not working

What value should I set the keystore location to?
Thanks!
Fanbin

Also attached the full exception log:

2020-11-17 09:35:49
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
... 15 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
at org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144)
... 22 more
Caused by: java.nio.file.NoSuchFileException: /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
Reply | Threaded
Open this post in threaded view
|

Re: How to set keystore.jks location on EMR when reading Kafka topics via SSL

Arvid Heise-3
Glad to hear that you worked it out.

Indeed, the path has to be accessible by the worker nodes. A common solution is also to put it on some DFS like HDFS and reference that. Then you only need to update one file if the key changes.

On Thu, Nov 19, 2020 at 2:14 AM Fanbin Bu <[hidden email]> wrote:
i have to put the keystore file to the nodes.

On Wed, Nov 18, 2020 at 4:29 PM Fanbin Bu <[hidden email]> wrote:
Hi,

This is a repost with modified subject per Sri Tummala's suggestion.

I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried to put keystore.jks location under /usr/lib/flink/... like:

export SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Notice that this is on EMR master(master) node. Both JM and TMs are on EMR core(slave) nodes.

Here is the code snippet:

val stmt = s"""
  |create table ${table.name} (${schema}, ${watermark})
  |with(
  |'connector' = 'kafka',
  |'topic' = '${table.topic}',
  |'scan.startup.mode'= '${table.scanStartUpMode}',
  |'properties.zookeeper.connect'='xxx',
  |'properties.bootstrap.servers'='xxx',
  |'properties.ssl.keystore.location'='/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks',
  |'properties.ssl.keystore.password'='xxx',
  |'properties.ssl.key.password'='xxx',
  |'properties.security.protocol'='SSL',
  |'properties.ssl.keystore.type'='JKS',
  |'properties.ssl.truststore.type'='JKS',
  |'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1',
  |'properties.group.id' = '${table.name}_group_id',
  |'format' = 'json',
  |'json.ignore-parse-errors' = 'true'
  |)
""".stripMargin

tEnv.executeSql(stmt)


However, I got exception: Caused by: java.nio.file.NoSuchFileException: /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
even though the file is there
[hadoop@ip-10-200-41-39 flink]$ ll /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
-rw-r--r-- 1 root root 5565 Nov 17 22:24 /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Things i tried:
1. the keystore.jks file itself works since I can use console-consumer to read kafka topics on EMR master.
2. set the location to be s3://my-bucket/keystore.jks, not working

What value should I set the keystore location to?
Thanks!
Fanbin

Also attached the full exception log:

2020-11-17 09:35:49
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
... 15 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
at org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144)
... 22 more
Caused by: java.nio.file.NoSuchFileException: /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng