S3 + Parquet credentials issue

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

S3 + Parquet credentials issue

Angelo G.
Hello,

Trying to read a parquet file located in S3 leads to a AWS credentials exception. Switching to other format (raw, for example) works ok regarding to file access.

This is a snippet of code to reproduce the issue:

static void parquetS3Error() {

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

// parquet format gives error:
// Caused by: java.net.SocketTimeoutException: doesBucketExist on bucket-prueba-medusa: com.amazonaws.AmazonClientException:
// No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
// com.amazonaws.SdkClientException: Failed to connect to service endpoint:
t_env.executeSql("CREATE TABLE backup ( `date` STRING, `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'parquet')");

// other formats (i.e. raw) work properly:
// Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
// +--------------------------------+
// | url |
// +--------------------------------+
// | [80, 65, 82, 49, 21, 0, 21,... |
// | [0, 0, 0, 50, 48, 50, 49, 4... |
t_env.executeSql("CREATE TABLE backup ( `url` BINARY) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");

Table t1 = t_env.from("backup");

t1.execute().print();

}
Flink version is 1.12.2.

Please find attached the pom with dependencies and version numbers.

What would be a suitable workaround for this?

Thank you very much.

Angelo.


 

pom.xml (10K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: S3 + Parquet credentials issue

Svend
Hi Angelo,

I've not worked exactly in the situation you described, although I've had to configure S3 access from a Flink application recently and here are a couple of things I learnt along the way:

* You should normally not need to include flink-s3-fs-hadoop nor hadoop-mapreduce-client-core in your classpath but should rather make flink-s3-fs-hadoop available to Flink by putting it into the plugins folder. The motivation for that is that this jar is a fat jar containing a lot of hadoop and aws classes, s.t. including it in your classpath quickly leads to conflicts. The plugins folder is associated with a separate classpath, with helps avoiding those conflicts.

* Under the hood, Fink is using the hadoop-aws library to connect to s3 => the documentation regarding how to configure it, and especially security accesses, is available in [1]

* Ideally, when running on AWS, your code should not be using BasicAWSCredentialsProvider, but instead the application should assume a role, which you associate with some IAM permission.  If that's your case, the specific documentation for that situation is in [2]. If you're running some test locally on your laptop, BasicAWSCredentialsProvider with some key id and secret pointing to a dev account may be appropriate.

* As I understand it, any configuration entry in flink.yaml that starts with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in [3]) => by reading documentation in [1] and [2] you might be able to figure out which parameters are relevant to your case, which you can then set with the mechanism just mentioned. For example, in my case, I simply add this to flink.yaml:

fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"

* You can debug the various operations that are attempted on S3 by setting this logger to DEBUG level:  org.apache.hadoop.fs.s3a


Good luck :)

Svend





On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
Hello,

Trying to read a parquet file located in S3 leads to a AWS credentials exception. Switching to other format (raw, for example) works ok regarding to file access.

This is a snippet of code to reproduce the issue:

static void parquetS3Error() {

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

// parquet format gives error:
// Caused by: java.net.SocketTimeoutException: doesBucketExist on bucket-prueba-medusa: com.amazonaws.AmazonClientException:
// No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
// com.amazonaws.SdkClientException: Failed to connect to service endpoint:
t_env.executeSql("CREATE TABLE backup ( `date` STRING, `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'parquet')");

// other formats (i.e. raw) work properly:
// Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
// +--------------------------------+
// | url |
// +--------------------------------+
// | [80, 65, 82, 49, 21, 0, 21,... |
// | [0, 0, 0, 50, 48, 50, 49, 4... |
t_env.executeSql("CREATE TABLE backup ( `url` BINARY) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");

Table t1 = t_env.from("backup");

t1.execute().print();

}
Flink version is 1.12.2.

Please find attached the pom with dependencies and version numbers.

What would be a suitable workaround for this?

Thank you very much.

Angelo.


 

Attachments:
  • pom.xml

Reply | Threaded
Open this post in threaded view
|

Re: S3 + Parquet credentials issue

Till Rohrmann
Hi Angelo,

what Svend has written is very good advice. Additionally, you could give us a bit more context by posting the exact stack trace and the exact configuration you use to deploy the Flink cluster. To me this looks like a configuration/setup problem in combination with AWS.

Cheers,
Till

On Mon, May 31, 2021 at 10:49 PM Svend <[hidden email]> wrote:
Hi Angelo,

I've not worked exactly in the situation you described, although I've had to configure S3 access from a Flink application recently and here are a couple of things I learnt along the way:

* You should normally not need to include flink-s3-fs-hadoop nor hadoop-mapreduce-client-core in your classpath but should rather make flink-s3-fs-hadoop available to Flink by putting it into the plugins folder. The motivation for that is that this jar is a fat jar containing a lot of hadoop and aws classes, s.t. including it in your classpath quickly leads to conflicts. The plugins folder is associated with a separate classpath, with helps avoiding those conflicts.

* Under the hood, Fink is using the hadoop-aws library to connect to s3 => the documentation regarding how to configure it, and especially security accesses, is available in [1]

* Ideally, when running on AWS, your code should not be using BasicAWSCredentialsProvider, but instead the application should assume a role, which you associate with some IAM permission.  If that's your case, the specific documentation for that situation is in [2]. If you're running some test locally on your laptop, BasicAWSCredentialsProvider with some key id and secret pointing to a dev account may be appropriate.

* As I understand it, any configuration entry in flink.yaml that starts with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in [3]) => by reading documentation in [1] and [2] you might be able to figure out which parameters are relevant to your case, which you can then set with the mechanism just mentioned. For example, in my case, I simply add this to flink.yaml:

fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"

* You can debug the various operations that are attempted on S3 by setting this logger to DEBUG level:  org.apache.hadoop.fs.s3a


Good luck :)

Svend





On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
Hello,

Trying to read a parquet file located in S3 leads to a AWS credentials exception. Switching to other format (raw, for example) works ok regarding to file access.

This is a snippet of code to reproduce the issue:

static void parquetS3Error() {

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

// parquet format gives error:
// Caused by: java.net.SocketTimeoutException: doesBucketExist on bucket-prueba-medusa: com.amazonaws.AmazonClientException:
// No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
// com.amazonaws.SdkClientException: Failed to connect to service endpoint:
t_env.executeSql("CREATE TABLE backup ( `date` STRING, `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'parquet')");

// other formats (i.e. raw) work properly:
// Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
// +--------------------------------+
// | url |
// +--------------------------------+
// | [80, 65, 82, 49, 21, 0, 21,... |
// | [0, 0, 0, 50, 48, 50, 49, 4... |
t_env.executeSql("CREATE TABLE backup ( `url` BINARY) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");

Table t1 = t_env.from("backup");

t1.execute().print();

}
Flink version is 1.12.2.

Please find attached the pom with dependencies and version numbers.

What would be a suitable workaround for this?

Thank you very much.

Angelo.


 

Attachments:
  • pom.xml

Reply | Threaded
Open this post in threaded view
|

Re: S3 + Parquet credentials issue

Angelo G.
Thank you Svend  and Till for your help.

Sorry for the the late response. 

I'll try to give more information about the issue:

I've not worked exactly in the situation you described, although I've had to configure S3 access from a Flink application recently and here are a couple of things I learnt along the way:
* You should normally not need to include flink-s3-fs-hadoop nor hadoop-mapreduce-client-core in your classpath but should rather make flink-s3-fs-hadoop available to Flink by putting it into the plugins folder. The motivation for that is that this jar is a fat jar containing a lot of hadoop and aws classes, s.t. including it in your classpath quickly leads to conflicts. The plugins folder is associated with a separate classpath, with helps avoiding those conflicts.
Following your advice I've leave these dependencies out from the pom. Thank you for the explanation.
* Under the hood, Fink is using the hadoop-aws library to connect to s3 => the documentation regarding how to configure it, and especially security accesses, is available in [1]
In our case, connection to S3 should be made via access/secret key pair. 
* Ideally, when running on AWS, your code should not be using BasicAWSCredentialsProvider, but instead the application should assume a role, which you associate with some IAM permission.  If that's your case, the specific documentation for that situation is in [2]. If you're running some test locally on your laptop, BasicAWSCredentialsProvider with some key id and secret pointing to a dev account may be appropriate.
Yes, in the Flink documentation is noted that IAM is the recommended way to access S3. But I am forced to use secret/access keys.  I'm not indicating in the flink-conf.yaml what credentials provider to use, the BasicAWSCredentialsProvider seems to be the default provider for Flink. But as we will see, this message is shown only when trying to read Parquet format. Other formats poses no problem.
* As I understand it, any configuration entry in flink.yaml that starts with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in [3]) => by reading documentation in [1] and [2] you might be able to figure out which parameters are relevant to your case, which you can then set with the mechanism just mentioned. For example, in my case, I simply add this to flink.yaml:
My flink-yaml.conf is as follows:
 
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: region
fs.s3a.path-style: true
fs.s3a.region: eu-west-3
fs.s3a.bucket.testbucket.access.key: xxxx
fs.s3a.bucket.testbucket.secret.key: xxxx



what Svend has written is very good advice. Additionally, you could give us a bit more context by posting the exact stack trace and the exact configuration you use to deploy the Flink cluster. To me this looks like a configuration/setup problem in combination with AWS.

The cluster setup for the tests is as follows:

flink-1.12.2-bin-scala_2.12.tgz unizipped in home folder. 
flink-1.12.2/opt/flink-s3-fs-hadoop-1.12.2.jar copied to flink-1.12.2/plugins/flink-s3-fs-hadoop/
flink-yaml.conf with the above contents.

The job is being launched like this:
~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH    -c org.apache.flink.s3.CompactDemo /home/xxx/git/recovery/flink-s3/target/flink-s3-1.0-SNAPSHOT.jar

Please find attached the two type of traces, one when using 'raw' format for the table (which is working ok) and the other when 'parquet' format is used (which fails). 


Again, sorry for the delay of my response and thank you very much for your help.



On Tue, Jun 1, 2021 at 5:30 PM Till Rohrmann <[hidden email]> wrote:
Hi Angelo,

what Svend has written is very good advice. Additionally, you could give us a bit more context by posting the exact stack trace and the exact configuration you use to deploy the Flink cluster. To me this looks like a configuration/setup problem in combination with AWS.

Cheers,
Till

On Mon, May 31, 2021 at 10:49 PM Svend <[hidden email]> wrote:
Hi Angelo,

I've not worked exactly in the situation you described, although I've had to configure S3 access from a Flink application recently and here are a couple of things I learnt along the way:

* You should normally not need to include flink-s3-fs-hadoop nor hadoop-mapreduce-client-core in your classpath but should rather make flink-s3-fs-hadoop available to Flink by putting it into the plugins folder. The motivation for that is that this jar is a fat jar containing a lot of hadoop and aws classes, s.t. including it in your classpath quickly leads to conflicts. The plugins folder is associated with a separate classpath, with helps avoiding those conflicts.

* Under the hood, Fink is using the hadoop-aws library to connect to s3 => the documentation regarding how to configure it, and especially security accesses, is available in [1]

* Ideally, when running on AWS, your code should not be using BasicAWSCredentialsProvider, but instead the application should assume a role, which you associate with some IAM permission.  If that's your case, the specific documentation for that situation is in [2]. If you're running some test locally on your laptop, BasicAWSCredentialsProvider with some key id and secret pointing to a dev account may be appropriate.

* As I understand it, any configuration entry in flink.yaml that starts with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in [3]) => by reading documentation in [1] and [2] you might be able to figure out which parameters are relevant to your case, which you can then set with the mechanism just mentioned. For example, in my case, I simply add this to flink.yaml:

fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"

* You can debug the various operations that are attempted on S3 by setting this logger to DEBUG level:  org.apache.hadoop.fs.s3a


Good luck :)

Svend





On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
Hello,

Trying to read a parquet file located in S3 leads to a AWS credentials exception. Switching to other format (raw, for example) works ok regarding to file access.

This is a snippet of code to reproduce the issue:

static void parquetS3Error() {

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

// parquet format gives error:
// Caused by: java.net.SocketTimeoutException: doesBucketExist on bucket-prueba-medusa: com.amazonaws.AmazonClientException:
// No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
// com.amazonaws.SdkClientException: Failed to connect to service endpoint:
t_env.executeSql("CREATE TABLE backup ( `date` STRING, `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'parquet')");

// other formats (i.e. raw) work properly:
// Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
// +--------------------------------+
// | url |
// +--------------------------------+
// | [80, 65, 82, 49, 21, 0, 21,... |
// | [0, 0, 0, 50, 48, 50, 49, 4... |
t_env.executeSql("CREATE TABLE backup ( `url` BINARY) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");

Table t1 = t_env.from("backup");

t1.execute().print();

}
Flink version is 1.12.2.

Please find attached the pom with dependencies and version numbers.

What would be a suitable workaround for this?

Thank you very much.

Angelo.


 

Attachments:
  • pom.xml


traces.zip (63K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: S3 + Parquet credentials issue

rmetzger0
Thanks for the logs.

The OK job seems to read from "s3a://test-bucket/", while the KO job reads from "s3a://bucket-test/". Could it be that you are just trying to access the wrong bucket?

What I also found interesting from the KO Job TaskManager is this log message:

Caused by: java.net.NoRouteToHostException: No route to host (Host unreachable)
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_171]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_171]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_171]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_171]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_171]
at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_171]
at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[?:1.8.0_171]
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) ~[?:1.8.0_171]
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199) ~[?:1.8.0_171]
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) ~[?:1.8.0_171]
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984) ~[?:1.8.0_171]
at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:52) ~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?]
at com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:80) ~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?]


Does your env allow access to all AWS resources?

On Tue, Jun 15, 2021 at 7:12 PM Angelo G. <[hidden email]> wrote:
Thank you Svend  and Till for your help.

Sorry for the the late response. 

I'll try to give more information about the issue:

I've not worked exactly in the situation you described, although I've had to configure S3 access from a Flink application recently and here are a couple of things I learnt along the way:
* You should normally not need to include flink-s3-fs-hadoop nor hadoop-mapreduce-client-core in your classpath but should rather make flink-s3-fs-hadoop available to Flink by putting it into the plugins folder. The motivation for that is that this jar is a fat jar containing a lot of hadoop and aws classes, s.t. including it in your classpath quickly leads to conflicts. The plugins folder is associated with a separate classpath, with helps avoiding those conflicts.
Following your advice I've leave these dependencies out from the pom. Thank you for the explanation.
* Under the hood, Fink is using the hadoop-aws library to connect to s3 => the documentation regarding how to configure it, and especially security accesses, is available in [1]
In our case, connection to S3 should be made via access/secret key pair. 
* Ideally, when running on AWS, your code should not be using BasicAWSCredentialsProvider, but instead the application should assume a role, which you associate with some IAM permission.  If that's your case, the specific documentation for that situation is in [2]. If you're running some test locally on your laptop, BasicAWSCredentialsProvider with some key id and secret pointing to a dev account may be appropriate.
Yes, in the Flink documentation is noted that IAM is the recommended way to access S3. But I am forced to use secret/access keys.  I'm not indicating in the flink-conf.yaml what credentials provider to use, the BasicAWSCredentialsProvider seems to be the default provider for Flink. But as we will see, this message is shown only when trying to read Parquet format. Other formats poses no problem.
* As I understand it, any configuration entry in flink.yaml that starts with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in [3]) => by reading documentation in [1] and [2] you might be able to figure out which parameters are relevant to your case, which you can then set with the mechanism just mentioned. For example, in my case, I simply add this to flink.yaml:
My flink-yaml.conf is as follows:
 
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: region
fs.s3a.path-style: true
fs.s3a.region: eu-west-3
fs.s3a.bucket.testbucket.access.key: xxxx
fs.s3a.bucket.testbucket.secret.key: xxxx



what Svend has written is very good advice. Additionally, you could give us a bit more context by posting the exact stack trace and the exact configuration you use to deploy the Flink cluster. To me this looks like a configuration/setup problem in combination with AWS.

The cluster setup for the tests is as follows:

flink-1.12.2-bin-scala_2.12.tgz unizipped in home folder. 
flink-1.12.2/opt/flink-s3-fs-hadoop-1.12.2.jar copied to flink-1.12.2/plugins/flink-s3-fs-hadoop/
flink-yaml.conf with the above contents.

The job is being launched like this:
~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH    -c org.apache.flink.s3.CompactDemo /home/xxx/git/recovery/flink-s3/target/flink-s3-1.0-SNAPSHOT.jar

Please find attached the two type of traces, one when using 'raw' format for the table (which is working ok) and the other when 'parquet' format is used (which fails). 


Again, sorry for the delay of my response and thank you very much for your help.



On Tue, Jun 1, 2021 at 5:30 PM Till Rohrmann <[hidden email]> wrote:
Hi Angelo,

what Svend has written is very good advice. Additionally, you could give us a bit more context by posting the exact stack trace and the exact configuration you use to deploy the Flink cluster. To me this looks like a configuration/setup problem in combination with AWS.

Cheers,
Till

On Mon, May 31, 2021 at 10:49 PM Svend <[hidden email]> wrote:
Hi Angelo,

I've not worked exactly in the situation you described, although I've had to configure S3 access from a Flink application recently and here are a couple of things I learnt along the way:

* You should normally not need to include flink-s3-fs-hadoop nor hadoop-mapreduce-client-core in your classpath but should rather make flink-s3-fs-hadoop available to Flink by putting it into the plugins folder. The motivation for that is that this jar is a fat jar containing a lot of hadoop and aws classes, s.t. including it in your classpath quickly leads to conflicts. The plugins folder is associated with a separate classpath, with helps avoiding those conflicts.

* Under the hood, Fink is using the hadoop-aws library to connect to s3 => the documentation regarding how to configure it, and especially security accesses, is available in [1]

* Ideally, when running on AWS, your code should not be using BasicAWSCredentialsProvider, but instead the application should assume a role, which you associate with some IAM permission.  If that's your case, the specific documentation for that situation is in [2]. If you're running some test locally on your laptop, BasicAWSCredentialsProvider with some key id and secret pointing to a dev account may be appropriate.

* As I understand it, any configuration entry in flink.yaml that starts with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in [3]) => by reading documentation in [1] and [2] you might be able to figure out which parameters are relevant to your case, which you can then set with the mechanism just mentioned. For example, in my case, I simply add this to flink.yaml:

fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"

* You can debug the various operations that are attempted on S3 by setting this logger to DEBUG level:  org.apache.hadoop.fs.s3a


Good luck :)

Svend





On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
Hello,

Trying to read a parquet file located in S3 leads to a AWS credentials exception. Switching to other format (raw, for example) works ok regarding to file access.

This is a snippet of code to reproduce the issue:

static void parquetS3Error() {

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

// parquet format gives error:
// Caused by: java.net.SocketTimeoutException: doesBucketExist on bucket-prueba-medusa: com.amazonaws.AmazonClientException:
// No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
// com.amazonaws.SdkClientException: Failed to connect to service endpoint:
t_env.executeSql("CREATE TABLE backup ( `date` STRING, `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'parquet')");

// other formats (i.e. raw) work properly:
// Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
// +--------------------------------+
// | url |
// +--------------------------------+
// | [80, 65, 82, 49, 21, 0, 21,... |
// | [0, 0, 0, 50, 48, 50, 49, 4... |
t_env.executeSql("CREATE TABLE backup ( `url` BINARY) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");

Table t1 = t_env.from("backup");

t1.execute().print();

}
Flink version is 1.12.2.

Please find attached the pom with dependencies and version numbers.

What would be a suitable workaround for this?

Thank you very much.

Angelo.


 

Attachments:
  • pom.xml

Reply | Threaded
Open this post in threaded view
|

Re: S3 + Parquet credentials issue

Angelo G.
Hello, Robert.

I've been changing manually the name of the buckets in the logs and other potentially sensitive data. The name of the buckets are ok, since changing the format from 'parquet' to 'raw' allows to retrieve the data. Sorry for the confusion.

Does your env allow access to all AWS resources?

Yes, I have full access to the aws objects.

Interesting fact: I have checked that putting the access/secret keys as OS environment variables and instructing Flink to use  EnvironmentVariableCredentialsProvider in the flink-conf.yaml works OK for both Parquet and Raw. Problem is that I won't be allowed to use environment vars in production environment.

Thank you very much.




On Wed, Jun 16, 2021 at 1:37 PM Robert Metzger <[hidden email]> wrote:
Thanks for the logs.

The OK job seems to read from "s3a://test-bucket/", while the KO job reads from "s3a://bucket-test/". Could it be that you are just trying to access the wrong bucket?

What I also found interesting from the KO Job TaskManager is this log message:

Caused by: java.net.NoRouteToHostException: No route to host (Host unreachable)
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_171]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_171]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_171]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_171]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_171]
at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_171]
at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[?:1.8.0_171]
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) ~[?:1.8.0_171]
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199) ~[?:1.8.0_171]
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) ~[?:1.8.0_171]
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984) ~[?:1.8.0_171]
at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:52) ~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?]
at com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:80) ~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?]


Does your env allow access to all AWS resources?

On Tue, Jun 15, 2021 at 7:12 PM Angelo G. <[hidden email]> wrote:
Thank you Svend  and Till for your help.

Sorry for the the late response. 

I'll try to give more information about the issue:

I've not worked exactly in the situation you described, although I've had to configure S3 access from a Flink application recently and here are a couple of things I learnt along the way:
* You should normally not need to include flink-s3-fs-hadoop nor hadoop-mapreduce-client-core in your classpath but should rather make flink-s3-fs-hadoop available to Flink by putting it into the plugins folder. The motivation for that is that this jar is a fat jar containing a lot of hadoop and aws classes, s.t. including it in your classpath quickly leads to conflicts. The plugins folder is associated with a separate classpath, with helps avoiding those conflicts.
Following your advice I've leave these dependencies out from the pom. Thank you for the explanation.
* Under the hood, Fink is using the hadoop-aws library to connect to s3 => the documentation regarding how to configure it, and especially security accesses, is available in [1]
In our case, connection to S3 should be made via access/secret key pair. 
* Ideally, when running on AWS, your code should not be using BasicAWSCredentialsProvider, but instead the application should assume a role, which you associate with some IAM permission.  If that's your case, the specific documentation for that situation is in [2]. If you're running some test locally on your laptop, BasicAWSCredentialsProvider with some key id and secret pointing to a dev account may be appropriate.
Yes, in the Flink documentation is noted that IAM is the recommended way to access S3. But I am forced to use secret/access keys.  I'm not indicating in the flink-conf.yaml what credentials provider to use, the BasicAWSCredentialsProvider seems to be the default provider for Flink. But as we will see, this message is shown only when trying to read Parquet format. Other formats poses no problem.
* As I understand it, any configuration entry in flink.yaml that starts with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in [3]) => by reading documentation in [1] and [2] you might be able to figure out which parameters are relevant to your case, which you can then set with the mechanism just mentioned. For example, in my case, I simply add this to flink.yaml:
My flink-yaml.conf is as follows:
 
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: region
fs.s3a.path-style: true
fs.s3a.region: eu-west-3
fs.s3a.bucket.testbucket.access.key: xxxx
fs.s3a.bucket.testbucket.secret.key: xxxx



what Svend has written is very good advice. Additionally, you could give us a bit more context by posting the exact stack trace and the exact configuration you use to deploy the Flink cluster. To me this looks like a configuration/setup problem in combination with AWS.

The cluster setup for the tests is as follows:

flink-1.12.2-bin-scala_2.12.tgz unizipped in home folder. 
flink-1.12.2/opt/flink-s3-fs-hadoop-1.12.2.jar copied to flink-1.12.2/plugins/flink-s3-fs-hadoop/
flink-yaml.conf with the above contents.

The job is being launched like this:
~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH    -c org.apache.flink.s3.CompactDemo /home/xxx/git/recovery/flink-s3/target/flink-s3-1.0-SNAPSHOT.jar

Please find attached the two type of traces, one when using 'raw' format for the table (which is working ok) and the other when 'parquet' format is used (which fails). 


Again, sorry for the delay of my response and thank you very much for your help.



On Tue, Jun 1, 2021 at 5:30 PM Till Rohrmann <[hidden email]> wrote:
Hi Angelo,

what Svend has written is very good advice. Additionally, you could give us a bit more context by posting the exact stack trace and the exact configuration you use to deploy the Flink cluster. To me this looks like a configuration/setup problem in combination with AWS.

Cheers,
Till

On Mon, May 31, 2021 at 10:49 PM Svend <[hidden email]> wrote:
Hi Angelo,

I've not worked exactly in the situation you described, although I've had to configure S3 access from a Flink application recently and here are a couple of things I learnt along the way:

* You should normally not need to include flink-s3-fs-hadoop nor hadoop-mapreduce-client-core in your classpath but should rather make flink-s3-fs-hadoop available to Flink by putting it into the plugins folder. The motivation for that is that this jar is a fat jar containing a lot of hadoop and aws classes, s.t. including it in your classpath quickly leads to conflicts. The plugins folder is associated with a separate classpath, with helps avoiding those conflicts.

* Under the hood, Fink is using the hadoop-aws library to connect to s3 => the documentation regarding how to configure it, and especially security accesses, is available in [1]

* Ideally, when running on AWS, your code should not be using BasicAWSCredentialsProvider, but instead the application should assume a role, which you associate with some IAM permission.  If that's your case, the specific documentation for that situation is in [2]. If you're running some test locally on your laptop, BasicAWSCredentialsProvider with some key id and secret pointing to a dev account may be appropriate.

* As I understand it, any configuration entry in flink.yaml that starts with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in [3]) => by reading documentation in [1] and [2] you might be able to figure out which parameters are relevant to your case, which you can then set with the mechanism just mentioned. For example, in my case, I simply add this to flink.yaml:

fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"

* You can debug the various operations that are attempted on S3 by setting this logger to DEBUG level:  org.apache.hadoop.fs.s3a


Good luck :)

Svend





On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
Hello,

Trying to read a parquet file located in S3 leads to a AWS credentials exception. Switching to other format (raw, for example) works ok regarding to file access.

This is a snippet of code to reproduce the issue:

static void parquetS3Error() {

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

// parquet format gives error:
// Caused by: java.net.SocketTimeoutException: doesBucketExist on bucket-prueba-medusa: com.amazonaws.AmazonClientException:
// No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
// com.amazonaws.SdkClientException: Failed to connect to service endpoint:
t_env.executeSql("CREATE TABLE backup ( `date` STRING, `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'parquet')");

// other formats (i.e. raw) work properly:
// Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
// +--------------------------------+
// | url |
// +--------------------------------+
// | [80, 65, 82, 49, 21, 0, 21,... |
// | [0, 0, 0, 50, 48, 50, 49, 4... |
t_env.executeSql("CREATE TABLE backup ( `url` BINARY) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");

Table t1 = t_env.from("backup");

t1.execute().print();

}
Flink version is 1.12.2.

Please find attached the pom with dependencies and version numbers.

What would be a suitable workaround for this?

Thank you very much.

Angelo.


 

Attachments:
  • pom.xml