keystore location on EMR

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

keystore location on EMR

Fanbin Bu
Hi,

I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried to put keystore.jks location under /usr/lib/flink/... like:

export SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Notice that this is on EMR master(master) node. Both JM and TMs are on EMR core(slave) nodes.

However, I got exception: Caused by: java.nio.file.NoSuchFileException: /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
even though the file is there

[hadoop@ip-10-200-41-39 flink]$ ll /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
-rw-r--r-- 1 root root 5565 Nov 17 22:24 /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Where should the keystore.jks be located?

Thanks,
Fanbin


Here is the full log.
2020-11-17 09:35:49
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
... 15 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
at org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144)
... 22 more
Caused by: java.nio.file.NoSuchFileException: /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
    
Reply | Threaded
Open this post in threaded view
|

Re: keystore location on EMR

Fanbin Bu

trying to put the jks on s3... unfortunately, no luck. 
i have properties set up:
'properties.ssl.keystore.location'='s3://application-bucket/kafka.keystore.jks'


got the following error message:
    at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
    at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144)
    ... 22 more
Caused by: java.nio.file.NoSuchFileException: s3:/application-bucket/kafka.keystore.jks
    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
    at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
    at java.nio.file.Files.newByteChannel(Files.java:361)
    at java.nio.file.Files.newByteChannel(Files.java:407)
    at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
    at java.nio.file.Files.newInputStream(Files.java:152)
    at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
    ... 23 more

On Tue, Nov 17, 2020 at 10:01 PM Fanbin Bu <[hidden email]> wrote:
let me try to put it on s3 and change code like:
'properties.ssl.keystore.location'='s3://my-bucket/keystore.jks

Thanks,
Fanbin

On Tue, Nov 17, 2020 at 6:43 PM sri hari kali charan Tummala <[hidden email]> wrote:
Try with hdfs folder with Gard coded value inside the code and see what happens.

On Tue, 17 Nov 2020 at 18:42, sri hari kali charan Tummala <[hidden email]> wrote:
Can you use hdfs as keystone location ? Are you using oozie to run your job ? 

On Tue, 17 Nov 2020 at 17:54, Fanbin Bu <[hidden email]> wrote:
Hi Sri, my code is not github. but here is the skeleton.

val stmt = s"""
|create table ${table.name} (${schema}, ${watermark})
|with(
|'connector' = 'kafka',
|'topic' = '${table.topic}',
|'scan.startup.mode'= '${table.scanStartUpMode}',
|'properties.zookeeper.connect'='${Globals.ZOOKEEPER_CONNECT}',
|'properties.bootstrap.servers'='${Globals.BOOTSTRAP_SERVERS}',
|'properties.ssl.keystore.location'='${Globals.SSL_KEYSTORE_LOCATION}',
|'properties.ssl.keystore.password'='${Globals.KEYSTORE_PASS}',
|'properties.ssl.key.password'='${Globals.KEYSTORE_PASS}',
|'properties.security.protocol'='SSL',
|'properties.ssl.keystore.type'='JKS',
|'properties.ssl.truststore.type'='JKS',
|'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1',
|'properties.group.id' = '${table.name}_group_id',
|'format' = 'json',
|'json.ignore-parse-errors' = 'true'
|)
""".stripMargin
tEnv.executeSql(stmt)

On Tue, Nov 17, 2020 at 5:40 PM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Fanbin,

Can you share your Flink code which reads from Kafka using SSL ? 

Is your code on GitHub ? 

Thanks
Sri


On Tue, 17 Nov 2020 at 17:14, Fanbin Bu <[hidden email]> wrote:
Hi,

I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried to put keystore.jks location under /usr/lib/flink/... like:

export SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Notice that this is on EMR master(master) node. Both JM and TMs are on EMR core(slave) nodes.

However, I got exception: Caused by: java.nio.file.NoSuchFileException: /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
even though the file is there

[hadoop@ip-10-200-41-39 flink]$ ll /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
-rw-r--r-- 1 root root 5565 Nov 17 22:24 /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Where should the keystore.jks be located?

Thanks,
Fanbin


Here is the full log.
2020-11-17 09:35:49
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
... 15 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
at org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144)
... 22 more
Caused by: java.nio.file.NoSuchFileException: /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
    
--
Thanks & Regards
Sri Tummala

--
Thanks & Regards
Sri Tummala

--
Thanks & Regards
Sri Tummala