Hi, I am trying to use Flink 1.2.1 with RockDB as statebackend and S3 for checkpoints. I am using Flink 1.2.1 docker images and running them in Kubernetes cluster. I have followed the steps documented in the Flink documentation -
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#s3-simple-storage-service I am using AWS IAM roles to setup access for S3. The role has actions "s3:GetObject","s3:ListBucket", "s3:PutObject", "s3:DeleteObject" on the bucket. When I run a job, the jobmanager logs below exception – java.io.IOException: The given file URI (s3://$MY_TEST_BUCKET/checkpoints) points to the HDFS NameNode at $MY_TEST_BUCKET, but the File System could not be initialized with that address:
Unable to load AWS credentials from any provider in the chain at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:334) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:265) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:304) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:105) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:219) at org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:803) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:220) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:655) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:643) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) at java.lang.Thread.run(Thread.java:748) Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521) at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031) at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994) at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320) ... 13 more I checked if the jobmanager pod in the K8s cluster has the correct IAM role applied. “curl
http://169.254.169.254/latest/meta-data/iam/security-credentials/” returned the correct role. After this, I installed aws cli on the jobmanager pod and could download/upload to $MY_TEST_BUCKET. This confirmed that the jobmanager pod has the correct IAM role associated with it. So, I am not sure why the AWS library in Flink is not able to load the credentials. Any thoughts or suggestions to fix or troubleshoot? Appreciate the help. Regards, Abhinav Bajaj
|
Hiya,
This sounds like it may be similar to the issue I had when running on ECS. Take a look at my ticket for how I got around this, and see if it's any help: https://issues.apache.org/jira/browse/FLINK-8439 Dyana On 2018/03/28 02:15:06, "Bajaj, Abhinav" <[hidden email]> wrote: > Hi, > > I am trying to use Flink 1.2.1 with RockDB as statebackend and S3 for checkpoints. > I am using Flink 1.2.1 docker images and running them in Kubernetes cluster. > > I have followed the steps documented in the Flink documentation - > https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#s3-simple-storage-service > > I am using AWS IAM roles to setup access for S3. > The role has actions "s3:GetObject","s3:ListBucket", "s3:PutObject", "s3:DeleteObject" on the bucket. > > When I run a job, the jobmanager logs below exception â > > java.io.IOException: The given file URI (s3://$MY_TEST_BUCKET/checkpoints) points to the HDFS NameNode at $MY_TEST_BUCKET, but the File System could not be initialized with that address: Unable to load AWS credentials from any provider in the chain > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:334) > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:265) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:304) > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) > at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:105) > at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172) > at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:219) > at org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:803) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:220) > at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:655) > at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:643) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) > at java.lang.Thread.run(Thread.java:748) > Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain > at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) > at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521) > at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031) > at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994) > at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297) > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320) > ... 13 more > > I checked if the jobmanager pod in the K8s cluster has the correct IAM role applied. > âcurl <a href="http://169.254.169.254/latest/meta-data/iam/security-credentials/â">http://169.254.169.254/latest/meta-data/iam/security-credentials/â returned the correct role. > > After this, I installed aws cli on the jobmanager pod and could download/upload to $MY_TEST_BUCKET. > This confirmed that the jobmanager pod has the correct IAM role associated with it. > > So, I am not sure why the AWS library in Flink is not able to load the credentials. > Any thoughts or suggestions to fix or troubleshoot? > > Appreciate the help. > > Regards, > Abhinav Bajaj > > > [cid:image001.png@01D3C5FF.E9E41E50] > > Abhinav Bajaj > Lead Engineer > Open Location Platform > Mobile: +1 708 329 9516 > > HERE Seattle > 701 Pike Street, suite 2000 > Seattle, WA 98101 USA > 47° 36' 41" N 122° 19' 57" W > > [cid:image002.png@01D3C5FF.E9E41E50]<http://360.here.com/> [cid:image003.png@01D3C5FF.E9E41E50] <https://www.twitter.com/here> [cid:image004.png@01D3C5FF.E9E41E50] <https://www.facebook.com/here> [cid:image005.png@01D3C5FF.E9E41E50] <https://www.linkedin.com/company/heremaps> [cid:image006.png@01D3C5FF.E9E41E50] <https://www.instagram.com/here/> > > > |
Using AWS credentials with Kubernetes are not trivial. Have you looked at AWS / Kubernetes docs and projects like https://github.com/jtblin/kube2iam which bridge between containers and AWS credentials? Also, Flink 1.2.1 is quite old, you may want to try a newer version. 1.4.x has a bit of an overhaul of the filesystems. On Wed, Mar 28, 2018 at 9:41 AM, dyana.rose <[hidden email]> wrote: Hiya, |
Hi, Thanks for the suggestions. We are using Kube2iam for our Kubernetes cluster and it seems to be setup correctly to support IAM Roles. I also checked
AWS documentation to troubleshoot the pods for access to the temporary security credentials. AWS CLI works as expected on the cluster pods. I also tested with a script that use ‘aws-java-sdk-1.7.4.jar’ to get credentials using the InstanceProfileCredentialsProvider. At this point, I think Flink is not using the InstanceProfileCredentialsProvider in my setup, probably some dependencies mismatch. I am using the same dependencies as documented by
Flink and the core-site.xml.
Am I missing some dependencies here? Any suggestions on troubleshooting the issue? [hidden email] We need to support Flink 1.2.1
for now. Thanks for your response. ~ Abhinav From:
Stephan Ewen <[hidden email]> Using AWS credentials with Kubernetes are not trivial. Have you looked at AWS / Kubernetes docs and projects like https://github.com/jtblin/kube2iam
which bridge between containers and AWS credentials? Also, Flink 1.2.1 is quite old, you may want to try a newer version. 1.4.x has a bit of an overhaul of the filesystems. On Wed, Mar 28, 2018 at 9:41 AM, dyana.rose <[hidden email]> wrote:
|
Hi! This is pretty much all in Hadoop's magic, from Flink's view, once this has been delegated to s3a. I seem to recall that there was something with older hadoop-aws versions or AWS SDK versions. There were cases where that version needed to be bumped. What we use in the pre-bundled s3 connectors in Flink 1.4+ is based on Hadoop 2.8.1 and we explicitly bump AWS SDK versions to 1.11.95. It might help in your case as well to bump the following dependencies (replace the jar files in "lib" or in the hadoop2 fat jar. <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-core</artifactId> <version>1.11.95</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-kms</artifactId> <version>1.11.95</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-s3</artifactId> <version>1.11.95</version> </dependency> Stephan On Mon, Apr 2, 2018 at 7:32 PM, Bajaj, Abhinav <[hidden email]> wrote:
|
Hi, Thanks for your replies and help. We found the root cause and fixed the problem. I will share the details in detail below for everyone but here’s the meat –
We had to add the IAM role to the taskmanager pods. The exception I noticed in Jobmanager logs originated from taskmanager. As soon as I added the IAM role to the taskmanager, checkpointing
worked. Details: I had intentionally added the IAM roles to the jobmanager and not taskmanager.
I misunderstood that the checkpoint is written to the filesystem by Jobmanager. Also, I did not want the taskmanager to get access to S3 for security reasons. To investigate, I tried a script on Jobmanager pod that would use the AWS Credentials provider chain to load credentials using the same Hadoop, aws dependencies.
The script could successfully access IAM role via Instance profile provider. I enabled debug logging and afterwards checked taskmanager logs and noticed the same exception with debug logs from AWS credentials provider.
This was the hint for me and explained the behavior I observed. Thanks again, ~ Abhi
From:
Stephan Ewen <[hidden email]> Hi! This is pretty much all in Hadoop's magic, from Flink's view, once this has been delegated to s3a. I seem to recall that there was something with older hadoop-aws versions or AWS SDK versions. There were cases where that version needed to be bumped. What we use in the pre-bundled s3 connectors in Flink 1.4+ is based on Hadoop 2.8.1 and we explicitly bump AWS SDK versions to 1.11.95. It might help in your case as well to bump the following dependencies (replace the jar files in "lib" or in the hadoop2 fat jar. <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-core</artifactId> <version>1.11.95</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-kms</artifactId> <version>1.11.95</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-s3</artifactId> <version>1.11.95</version> </dependency> Best, Stephan On Mon, Apr 2, 2018 at 7:32 PM, Bajaj, Abhinav <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |