[Native Kubernetes] [Ceph S3] Unable to load credentials from service endpoint.

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

[Native Kubernetes] [Ceph S3] Unable to load credentials from service endpoint.

Niels Basjes
Hi,

I have a problem with accessing my own S3 system from within Flink when running on Kubernetes. 

TL;DR I have my own S3 (Ceph), Locally my application works, when running in K8s it fails with
Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
Caused by: java.net.SocketException: Network is unreachable (connect failed)

I have my own Kubernetes cluster (1.17) on which I have install Ceph and the S3 gateway that is included in there.
I have put a file on this 'S3' and in my Flink 1.10.0 application I do this:
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
final Configuration conf = new Configuration();
conf.setString("presto.s3.endpoint",         "s3.example.nl");
conf.setString("presto.s3.access-key",       "myAccessKey");
conf.setString("presto.s3.secret-key",       "mySecretKey");
FileSystem.initialize(conf, null);
senv.setParallelism(2);
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> rawInputStream = senv
    .readTextFile(path).name("Read input");
...

The s3.example.nl is the hostname of the ingress I have attached to the S3 endpoint. In my case it is accessible via both http and https (with a valid LetsEncrypt certificate). 

When I run this locally from within IntelliJ it works like a charm, reads the data, does some stuff with it and then writes it to ElasticSearch.

I have created an additional layer to enable the fs-s3-presto plugin with this Dockerfile.

FROM flink:1.10.0-scala_2.12
RUN mkdir /opt/flink/plugins/s3-fs-presto && cp /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto

I run flink with this customized docker image like this

#!/bin/bash
./flink-1.10.0/bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=flink1100 \
  -Dtaskmanager.memory.process.size=8192m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=8 \
  -Dresourcemanager.taskmanager-timeout=3600000 \
  -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto

I then submit this into Kubernetes with this command

flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100 target/flink-table-esloader-0.1-SNAPSHOT.jar

The job starts and after about 40 seconds the job fails with this exception:

Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
at com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
at com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
at com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563)
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152)
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143)
at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.net.SocketException: Network is unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
at com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
... 28 more

I have tried this with 
conf.setString("presto.s3.endpoint",         "s3.example.nl");
and also by using the ClusterIP and the LoadBalancer IP and I get the same error in all cases.

I have verified by logging in into the task manager pod that all of these endpoints show a sensible result when simply doing a curl from the commandline. 

I have the s3cmd installed locally on my laptop. 
My ~/.s3cfg looks like this and I can fully access this S3 setup.
 
[default]
access_key = myAccessKey
secret_key = mySecretKey
host_base = s3.example.nl

I'm stuck, please help:
  • What is causing the differences in behaviour between local and in k8s? It works locally but not in the cluster.
  • How do I figure out what network it is trying to reach in k8s?

Thanks.
--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: [Native Kubernetes] [Ceph S3] Unable to load credentials from service endpoint.

Yang Wang
Hi Niels,

Glad to hear that you are trying Flink native K8s integration and share you feedback.

What is causing the differences in behavior between local and in k8s? It works locally but not in the cluster.

In your case, the job could be executed successfully local. That means S3 endpoint could be accessed in
your local network environment. When you submit the job to the K8s cluster, the user `main()` will be executed
locally and get the job graph. Then it will be submitted to the cluster for the execution. S3 endpoint will be
accessed under the K8s network. So maybe there is something wrong with the network between taskmanager
and S3 endpoint.

How do I figure out what network it is trying to reach in k8s?

I am not an expert of S3. So i am not sure whether the SDK will fetch the credentials from S3 endpoint. If it is,
i think you need to find out which taskmanager the source operator is running on. Then exec into the Pod and
use nslookup/curl to make sure the endpoint "s3.example.nl" could be resolved and accessed successfully.



Best,
Yang
 

Niels Basjes <[hidden email]> 于2020年2月28日周五 上午4:56写道:
Hi,

I have a problem with accessing my own S3 system from within Flink when running on Kubernetes. 

TL;DR I have my own S3 (Ceph), Locally my application works, when running in K8s it fails with
Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
Caused by: java.net.SocketException: Network is unreachable (connect failed)

I have my own Kubernetes cluster (1.17) on which I have install Ceph and the S3 gateway that is included in there.
I have put a file on this 'S3' and in my Flink 1.10.0 application I do this:
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
final Configuration conf = new Configuration();
conf.setString("presto.s3.endpoint",         "s3.example.nl");
conf.setString("presto.s3.access-key",       "myAccessKey");
conf.setString("presto.s3.secret-key",       "mySecretKey");
FileSystem.initialize(conf, null);
senv.setParallelism(2);
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> rawInputStream = senv
    .readTextFile(path).name("Read input");
...

The s3.example.nl is the hostname of the ingress I have attached to the S3 endpoint. In my case it is accessible via both http and https (with a valid LetsEncrypt certificate). 

When I run this locally from within IntelliJ it works like a charm, reads the data, does some stuff with it and then writes it to ElasticSearch.

I have created an additional layer to enable the fs-s3-presto plugin with this Dockerfile.

FROM flink:1.10.0-scala_2.12
RUN mkdir /opt/flink/plugins/s3-fs-presto && cp /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto

I run flink with this customized docker image like this

#!/bin/bash
./flink-1.10.0/bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=flink1100 \
  -Dtaskmanager.memory.process.size=8192m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=8 \
  -Dresourcemanager.taskmanager-timeout=3600000 \
  -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto

I then submit this into Kubernetes with this command

flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100 target/flink-table-esloader-0.1-SNAPSHOT.jar

The job starts and after about 40 seconds the job fails with this exception:

Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
at com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
at com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
at com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563)
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152)
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143)
at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.net.SocketException: Network is unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
at com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
... 28 more

I have tried this with 
conf.setString("presto.s3.endpoint",         "s3.example.nl");
and also by using the ClusterIP and the LoadBalancer IP and I get the same error in all cases.

I have verified by logging in into the task manager pod that all of these endpoints show a sensible result when simply doing a curl from the commandline. 

I have the s3cmd installed locally on my laptop. 
My ~/.s3cfg looks like this and I can fully access this S3 setup.
 
[default]
access_key = myAccessKey
secret_key = mySecretKey
host_base = s3.example.nl

I'm stuck, please help:
  • What is causing the differences in behaviour between local and in k8s? It works locally but not in the cluster.
  • How do I figure out what network it is trying to reach in k8s?

Thanks.
--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: [Native Kubernetes] [Ceph S3] Unable to load credentials from service endpoint.

Niels Basjes
Hi,

As I mentioned in my original email I already verified that the endpoints were accessible from the pods, that was not the problem.

It took me a while but I've figured out what went wrong.

Setting the configuration like I did 
final Configuration conf = new Configuration();
conf.setString("presto.s3.endpoint",         "s3.example.nl");
conf.setString("presto.s3.access-key",       "myAccessKey");
conf.setString("presto.s3.secret-key",       "mySecretKey");
FileSystem.initialize(conf, null);
sets it in some static variables that do not get serialized and shipped into the task managers.

As a consequence, under the absence of credentials the AWS/S3 client assumes it is running inside AWS and that it can retrieve the credentials from http://169.254.170.2  (which is non routable)
Because this is not AWS it cannot do this and I get the error it cannot connect.

For now my solution is to start the Flink Session with this
#!/bin/bash
./flink-1.10.0/bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=flink1100 \
  -Dtaskmanager.memory.process.size=8192m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dresourcemanager.taskmanager-timeout=3600000 \
  -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto \
  -Dpresto.s3.endpoint=s3.example.nl \
  -Dpresto.s3.access-key=MyAccessKey \
  -Dpresto.s3.secret-key=MySecretKey \
  -Dpresto.s3.path.style.access=true

I dislike this because now ALL jobs in this Flink cluster have the same credentials.

Is there a way to set the S3 credentials on a per job or even per connection basis?

Niels Basjes


On Fri, Feb 28, 2020 at 4:38 AM Yang Wang <[hidden email]> wrote:
Hi Niels,

Glad to hear that you are trying Flink native K8s integration and share you feedback.

What is causing the differences in behavior between local and in k8s? It works locally but not in the cluster.

In your case, the job could be executed successfully local. That means S3 endpoint could be accessed in
your local network environment. When you submit the job to the K8s cluster, the user `main()` will be executed
locally and get the job graph. Then it will be submitted to the cluster for the execution. S3 endpoint will be
accessed under the K8s network. So maybe there is something wrong with the network between taskmanager
and S3 endpoint.

How do I figure out what network it is trying to reach in k8s?

I am not an expert of S3. So i am not sure whether the SDK will fetch the credentials from S3 endpoint. If it is,
i think you need to find out which taskmanager the source operator is running on. Then exec into the Pod and
use nslookup/curl to make sure the endpoint "s3.example.nl" could be resolved and accessed successfully.



Best,
Yang
 

Niels Basjes <[hidden email]> 于2020年2月28日周五 上午4:56写道:
Hi,

I have a problem with accessing my own S3 system from within Flink when running on Kubernetes. 

TL;DR I have my own S3 (Ceph), Locally my application works, when running in K8s it fails with
Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
Caused by: java.net.SocketException: Network is unreachable (connect failed)

I have my own Kubernetes cluster (1.17) on which I have install Ceph and the S3 gateway that is included in there.
I have put a file on this 'S3' and in my Flink 1.10.0 application I do this:
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
final Configuration conf = new Configuration();
conf.setString("presto.s3.endpoint",         "s3.example.nl");
conf.setString("presto.s3.access-key",       "myAccessKey");
conf.setString("presto.s3.secret-key",       "mySecretKey");
FileSystem.initialize(conf, null);
senv.setParallelism(2);
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> rawInputStream = senv
    .readTextFile(path).name("Read input");
...

The s3.example.nl is the hostname of the ingress I have attached to the S3 endpoint. In my case it is accessible via both http and https (with a valid LetsEncrypt certificate). 

When I run this locally from within IntelliJ it works like a charm, reads the data, does some stuff with it and then writes it to ElasticSearch.

I have created an additional layer to enable the fs-s3-presto plugin with this Dockerfile.

FROM flink:1.10.0-scala_2.12
RUN mkdir /opt/flink/plugins/s3-fs-presto && cp /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto

I run flink with this customized docker image like this

#!/bin/bash
./flink-1.10.0/bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=flink1100 \
  -Dtaskmanager.memory.process.size=8192m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=8 \
  -Dresourcemanager.taskmanager-timeout=3600000 \
  -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto

I then submit this into Kubernetes with this command

flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100 target/flink-table-esloader-0.1-SNAPSHOT.jar

The job starts and after about 40 seconds the job fails with this exception:

Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
at com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
at com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
at com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563)
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152)
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143)
at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.net.SocketException: Network is unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
at com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
... 28 more

I have tried this with 
conf.setString("presto.s3.endpoint",         "s3.example.nl");
and also by using the ClusterIP and the LoadBalancer IP and I get the same error in all cases.

I have verified by logging in into the task manager pod that all of these endpoints show a sensible result when simply doing a curl from the commandline. 

I have the s3cmd installed locally on my laptop. 
My ~/.s3cfg looks like this and I can fully access this S3 setup.
 
[default]
access_key = myAccessKey
secret_key = mySecretKey
host_base = s3.example.nl

I'm stuck, please help:
  • What is causing the differences in behaviour between local and in k8s? It works locally but not in the cluster.
  • How do I figure out what network it is trying to reach in k8s?

Thanks.
--
Best regards / Met vriendelijke groeten,

Niels Basjes


--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: [Native Kubernetes] [Ceph S3] Unable to load credentials from service endpoint.

Yang Wang
Hi Niels,

You are right. The S3 related configurations you have set in your `main()` is only
applicable in the client side. Since the filesystem is initialized in the entrypoint of
JM/TM for only once. AFAIK, we could not provide different credentials for each
job in the same session cluster.

Best,
Yang

Niels Basjes <[hidden email]> 于2020年2月28日周五 下午11:09写道:
Hi,

As I mentioned in my original email I already verified that the endpoints were accessible from the pods, that was not the problem.

It took me a while but I've figured out what went wrong.

Setting the configuration like I did 
final Configuration conf = new Configuration();
conf.setString("presto.s3.endpoint",         "s3.example.nl");
conf.setString("presto.s3.access-key",       "myAccessKey");
conf.setString("presto.s3.secret-key",       "mySecretKey");
FileSystem.initialize(conf, null);
sets it in some static variables that do not get serialized and shipped into the task managers.

As a consequence, under the absence of credentials the AWS/S3 client assumes it is running inside AWS and that it can retrieve the credentials from http://169.254.170.2  (which is non routable)
Because this is not AWS it cannot do this and I get the error it cannot connect.

For now my solution is to start the Flink Session with this
#!/bin/bash
./flink-1.10.0/bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=flink1100 \
  -Dtaskmanager.memory.process.size=8192m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dresourcemanager.taskmanager-timeout=3600000 \
  -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto \
  -Dpresto.s3.endpoint=s3.example.nl \
  -Dpresto.s3.access-key=MyAccessKey \
  -Dpresto.s3.secret-key=MySecretKey \
  -Dpresto.s3.path.style.access=true

I dislike this because now ALL jobs in this Flink cluster have the same credentials.

Is there a way to set the S3 credentials on a per job or even per connection basis?

Niels Basjes


On Fri, Feb 28, 2020 at 4:38 AM Yang Wang <[hidden email]> wrote:
Hi Niels,

Glad to hear that you are trying Flink native K8s integration and share you feedback.

What is causing the differences in behavior between local and in k8s? It works locally but not in the cluster.

In your case, the job could be executed successfully local. That means S3 endpoint could be accessed in
your local network environment. When you submit the job to the K8s cluster, the user `main()` will be executed
locally and get the job graph. Then it will be submitted to the cluster for the execution. S3 endpoint will be
accessed under the K8s network. So maybe there is something wrong with the network between taskmanager
and S3 endpoint.

How do I figure out what network it is trying to reach in k8s?

I am not an expert of S3. So i am not sure whether the SDK will fetch the credentials from S3 endpoint. If it is,
i think you need to find out which taskmanager the source operator is running on. Then exec into the Pod and
use nslookup/curl to make sure the endpoint "s3.example.nl" could be resolved and accessed successfully.



Best,
Yang
 

Niels Basjes <[hidden email]> 于2020年2月28日周五 上午4:56写道:
Hi,

I have a problem with accessing my own S3 system from within Flink when running on Kubernetes. 

TL;DR I have my own S3 (Ceph), Locally my application works, when running in K8s it fails with
Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
Caused by: java.net.SocketException: Network is unreachable (connect failed)

I have my own Kubernetes cluster (1.17) on which I have install Ceph and the S3 gateway that is included in there.
I have put a file on this 'S3' and in my Flink 1.10.0 application I do this:
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
final Configuration conf = new Configuration();
conf.setString("presto.s3.endpoint",         "s3.example.nl");
conf.setString("presto.s3.access-key",       "myAccessKey");
conf.setString("presto.s3.secret-key",       "mySecretKey");
FileSystem.initialize(conf, null);
senv.setParallelism(2);
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> rawInputStream = senv
    .readTextFile(path).name("Read input");
...

The s3.example.nl is the hostname of the ingress I have attached to the S3 endpoint. In my case it is accessible via both http and https (with a valid LetsEncrypt certificate). 

When I run this locally from within IntelliJ it works like a charm, reads the data, does some stuff with it and then writes it to ElasticSearch.

I have created an additional layer to enable the fs-s3-presto plugin with this Dockerfile.

FROM flink:1.10.0-scala_2.12
RUN mkdir /opt/flink/plugins/s3-fs-presto && cp /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto

I run flink with this customized docker image like this

#!/bin/bash
./flink-1.10.0/bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=flink1100 \
  -Dtaskmanager.memory.process.size=8192m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=8 \
  -Dresourcemanager.taskmanager-timeout=3600000 \
  -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto

I then submit this into Kubernetes with this command

flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100 target/flink-table-esloader-0.1-SNAPSHOT.jar

The job starts and after about 40 seconds the job fails with this exception:

Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
at com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
at com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
at com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563)
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152)
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143)
at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.net.SocketException: Network is unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
at com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
... 28 more

I have tried this with 
conf.setString("presto.s3.endpoint",         "s3.example.nl");
and also by using the ClusterIP and the LoadBalancer IP and I get the same error in all cases.

I have verified by logging in into the task manager pod that all of these endpoints show a sensible result when simply doing a curl from the commandline. 

I have the s3cmd installed locally on my laptop. 
My ~/.s3cfg looks like this and I can fully access this S3 setup.
 
[default]
access_key = myAccessKey
secret_key = mySecretKey
host_base = s3.example.nl

I'm stuck, please help:
  • What is causing the differences in behaviour between local and in k8s? It works locally but not in the cluster.
  • How do I figure out what network it is trying to reach in k8s?

Thanks.
--
Best regards / Met vriendelijke groeten,

Niels Basjes


--
Best regards / Met vriendelijke groeten,

Niels Basjes