Hi,
I have a problem with accessing my own S3 system from within Flink when running on Kubernetes. TL;DR I have my own S3 (Ceph), Locally my application works, when running in K8s it fails with
I have my own Kubernetes cluster (1.17) on which I have install Ceph and the S3 gateway that is included in there. I have put a file on this 'S3' and in my Flink 1.10.0 application I do this:
The s3.example.nl is the hostname of the ingress I have attached to the S3 endpoint. In my case it is accessible via both http and https (with a valid LetsEncrypt certificate). When I run this locally from within IntelliJ it works like a charm, reads the data, does some stuff with it and then writes it to ElasticSearch. I have created an additional layer to enable the fs-s3-presto plugin with this Dockerfile.
I run flink with this customized docker image like this
I then submit this into Kubernetes with this command
The job starts and after about 40 seconds the job fails with this exception:
I have tried this with conf.setString("presto.s3.endpoint", "s3.example.nl"); and also by using the ClusterIP and the LoadBalancer IP and I get the same error in all cases. I have verified by logging in into the task manager pod that all of these endpoints show a sensible result when simply doing a curl from the commandline. I have the s3cmd installed locally on my laptop. My ~/.s3cfg looks like this and I can fully access this S3 setup.
I'm stuck, please help:
Thanks. -- Best regards / Met vriendelijke groeten, Niels Basjes |
Hi Niels, Glad to hear that you are trying Flink native K8s integration and share you feedback. What is causing the differences in behavior between local and in k8s? It works locally but not in the cluster. In your case, the job could be executed successfully local. That means S3 endpoint could be accessed in your local network environment. When you submit the job to the K8s cluster, the user `main()` will be executed locally and get the job graph. Then it will be submitted to the cluster for the execution. S3 endpoint will be accessed under the K8s network. So maybe there is something wrong with the network between taskmanager and S3 endpoint. How do I figure out what network it is trying to reach in k8s? I am not an expert of S3. So i am not sure whether the SDK will fetch the credentials from S3 endpoint. If it is, i think you need to find out which taskmanager the source operator is running on. Then exec into the Pod and use nslookup/curl to make sure the endpoint "s3.example.nl" could be resolved and accessed successfully. Best, Yang Niels Basjes <[hidden email]> 于2020年2月28日周五 上午4:56写道:
|
Hi, As I mentioned in my original email I already verified that the endpoints were accessible from the pods, that was not the problem. It took me a while but I've figured out what went wrong. Setting the configuration like I did
sets it in some static variables that do not get serialized and shipped into the task managers. As a consequence, under the absence of credentials the AWS/S3 client assumes it is running inside AWS and that it can retrieve the credentials from http://169.254.170.2 (which is non routable) Because this is not AWS it cannot do this and I get the error it cannot connect. For now my solution is to start the Flink Session with this #!/bin/bash ./flink-1.10.0/bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=flink1100 \ -Dtaskmanager.memory.process.size=8192m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dresourcemanager.taskmanager-timeout=3600000 \ -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto \ -Dpresto.s3.endpoint=s3.example.nl \ -Dpresto.s3.access-key=MyAccessKey \ -Dpresto.s3.secret-key=MySecretKey \ -Dpresto.s3.path.style.access=true I dislike this because now ALL jobs in this Flink cluster have the same credentials. Is there a way to set the S3 credentials on a per job or even per connection basis? Niels Basjes On Fri, Feb 28, 2020 at 4:38 AM Yang Wang <[hidden email]> wrote:
Best regards / Met vriendelijke groeten,
Niels Basjes |
Hi Niels, You are right. The S3 related configurations you have set in your `main()` is only applicable in the client side. Since the filesystem is initialized in the entrypoint of JM/TM for only once. AFAIK, we could not provide different credentials for each job in the same session cluster. Best, Yang Niels Basjes <[hidden email]> 于2020年2月28日周五 下午11:09写道:
|
Free forum by Nabble | Edit this page |