Issue reading from S3

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue reading from S3

Angelo G.
Hi,

I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting the job to local cluster (tar.gz distribution). I do not have a Hadoop installation running in the same machine. S3 (not Amazon) is running in a remote location and I have access to it via endpoint and access/secret keys.

The issue is that I'm able to read and write from and to S3 when using StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText methods but I can't read from S3 when using the table API.

This is the application:

package org.apache.flink;

import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class ReadTables {

    public static void main(String[] args) throws Exception {

        // CLASSIC API (PLAIN TEXT)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> ds = env.readTextFile("s3a://bucket/source.txt");

        ds.writeAsText("s3a://bucket/dest.txt", FileSystem.WriteMode.OVERWRITE);

        env.execute();


        // TABLE API
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

        TableEnvironment t_env = TableEnvironment.create(settings);

        t_env.getConfig().getConfiguration().setString("parallelism.default", "1");
      
        t_env.executeSql("CREATE TABLE example (  `date` STRING,  `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://bucket/xxx/yyy/', 'format' = 'parquet')");

        Table t1 = t_env.from("example");

        t1.execute().print();

    }

}

The first job works properly, reading the source.txt file and writing it to dest.txt.

The second job does not work:

$~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c org.apache.flink.ReadTables flink-s3-1.0-SNAPSHOT.jar;

Job has been submitted with JobID c690f2222aed0051d1501d5b9747b56f
Program execution finished
Job with JobID c690f2222aed0051d1501d5b9747b56f has finished.
Job Runtime: 17358 ms

Job has been submitted with JobID ebe54017faa83af33923d50892283e11
+--------------------------------+-------------+
|                           date |       value |
+--------------------------------+-------------+

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to fetch next result
Caused by: java.lang.RuntimeException: Failed to fetch next result
Caused by: java.io.IOException: Failed to fetch job execution result
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
Caused by: java.net.SocketTimeoutException: doesBucketExist on scib-des-cm-fipoac-medusa: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
Caused by: com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
Caused by: java.net.SocketTimeoutException: connect timed out

I have the access credentials configured in flink-conf.yaml file:

s3a.endpoint: http://s3.xxxxxxx
s3a.path-style: true
s3a.access-key: xxxxxxxxx
s3a.secret-key: xxxxxxxxx
s3a.entropy.key: _entropy_
s3a.entropy.length: 4
s3a.region: s3
s3a.bucket: xxxxxxxxx


I copied the flink-s3-fs-hadoop jar in the plugins folder but I had to add it as a dependency (not provided) to the pom, otherwise a S3AFileSystem 'class not found' exception arises.

Thank you for your help,

Angelo.


Reply | Threaded
Open this post in threaded view
|

Re: Issue reading from S3

Yun Gao
Hi Angelo,

I tried the fail case provied with a similar one:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

t_env.getConfig().getConfiguration().setString("parallelism.default", "1");

t_env.executeSql("CREATE TABLE example ( `url` STRING) WITH ( 'connector' = 'filesystem', 'path' = 's3a://whatnamedoyouwant/links', 'format' = 'raw')");

Table t1 = t_env.from("example");

t1.execute().print();

However, it seems the job could be executed successfully. 

I further tried with the configuration, and found that the exception
is thrown if there is no s3a.access-key or s3a.secret-key
configured. Could you have a look at if the two configuration items
are effective ?

Also I only configured the s3a.path-style: true, s3a.access-key and
s3a.secret-key, is it possible to remove the other configuration items
and have a try ? 

Best,
Yun



------------------Original Mail ------------------
Sender:Angelo G. <[hidden email]>
Send Date:Wed May 19 00:24:42 2021
Recipients:Flink User Mail List <[hidden email]>
Subject:Issue reading from S3
Hi,

I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting the job to local cluster (tar.gz distribution). I do not have a Hadoop installation running in the same machine. S3 (not Amazon) is running in a remote location and I have access to it via endpoint and access/secret keys.

The issue is that I'm able to read and write from and to S3 when using StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText methods but I can't read from S3 when using the table API.

This is the application:

package org.apache.flink;import org.apache.flink.core.fs.FileSystem;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;public class ReadTables {    public static void main(String[] args) throws Exception {        // CLASSIC API (PLAIN TEXT)        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStream<String> ds = env.readTextFile("s3a://bucket/source.txt");        ds.writeAsText("s3a://bucket/dest.txt", FileSystem.WriteMode.OVERWRITE);        env.execute();        // TABLE API        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();        TableEnvironment t_env = TableEnvironment.create(settings);        t_env.getConfig().getConfiguration().setString("parallelism.default", "1");              t_env.executeSql("CREATE TABLE example (  `date` STRING,  `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://bucket/xxx/yyy/', 'format' = 'parquet')");        Table t1 = t_env.from("example");        t1.execute().print();    }}

The first job works properly, reading the source.txt file and writing it to dest.txt.

The second job does not work:

$~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c org.apache.flink.ReadTables flink-s3-1.0-SNAPSHOT.jar;

Job has been submitted with JobID c690f2222aed0051d1501d5b9747b56f
Program execution finished
Job with JobID c690f2222aed0051d1501d5b9747b56f has finished.
Job Runtime: 17358 ms

Job has been submitted with JobID ebe54017faa83af33923d50892283e11
+--------------------------------+-------------+
|                           date |       value |
+--------------------------------+-------------+

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to fetch next result
Caused by: java.lang.RuntimeException: Failed to fetch next result
Caused by: java.io.IOException: Failed to fetch job execution result
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
Caused by: java.net.SocketTimeoutException: doesBucketExist on scib-des-cm-fipoac-medusa: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
Caused by: com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
Caused by: java.net.SocketTimeoutException: connect timed out

I have the access credentials configured in flink-conf.yaml file:

s3a.endpoint: http://s3.xxxxxxx
s3a.path-style: true
s3a.access-key: xxxxxxxxx
s3a.secret-key: xxxxxxxxx
s3a.entropy.key: _entropy_
s3a.entropy.length: 4
s3a.region: s3
s3a.bucket: xxxxxxxxx


I copied the flink-s3-fs-hadoop jar in the plugins folder but I had to add it as a dependency (not provided) to the pom, otherwise a S3AFileSystem 'class not found' exception arises.

Thank you for your help,

Angelo.


Reply | Threaded
Open this post in threaded view
|

Re: Issue reading from S3

Angelo G.
Hi Yun Gao,

Thank you for your prompt response.

I've changed the table 'format' from 'parquet' to 'raw' as in your example and I've been able to access the file:

Job has been submitted with JobID 441e7518bb615109624c1f33f222475b
+--------------------------------+
|                            url |
+--------------------------------+
|AR1 ����
| 2021-05-16 ���� ... |
|                   2021-05-16 |
| 2021-05-16 ... |

So it seems the problem is linked with the parquet dependencies. This is my POM:

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.2</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.1.1,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

Another interesting fact is when exporting the access and secret key as env variables and adding:

fs.s3a.aws.credentials.provider: com.amazonaws.auth.EnvironmentVariableCredentialsProvider

to flink-conf.yaml I'm able to read and decode the parquet file properly:

Job has been submitted with JobID 15453b8be2bddcb49c1141a01013bf81
+--------------------------------+-------------+
|                           date |       value |
+--------------------------------+-------------+
|                     2021-05-16 |           7 |
|                     2021-05-16 |           8 |
|                     2021-05-16 |           8 |
|                     2021-05-16 |           8 |
|                     2021-05-16 |           1 |
|                     2021-05-16 |           3 |
|                     2021-05-16 |           9 |
|                     2021-05-16 |           9 |
|                     2021-05-16 |           8 |
|                     2021-05-16 |           7 |
|                     2021-05-16 |           9 



We won't be able to set env variables in production, so this is not a valid workaround for us. 

Please have a look to the POM so you can tell a dependency miss or misuse of some sort.

Thank you very much.

Angelo.


On Thu, May 20, 2021 at 11:54 AM Yun Gao <[hidden email]> wrote:
Hi Angelo,

I tried the fail case provied with a similar one:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

t_env.getConfig().getConfiguration().setString("parallelism.default", "1");

t_env.executeSql("CREATE TABLE example ( `url` STRING) WITH ( 'connector' = 'filesystem', 'path' = 's3a://whatnamedoyouwant/links', 'format' = 'raw')");

Table t1 = t_env.from("example");

t1.execute().print();

However, it seems the job could be executed successfully. 

I further tried with the configuration, and found that the exception
is thrown if there is no s3a.access-key or s3a.secret-key
configured. Could you have a look at if the two configuration items
are effective ?

Also I only configured the s3a.path-style: true, s3a.access-key and
s3a.secret-key, is it possible to remove the other configuration items
and have a try ? 

Best,
Yun



------------------Original Mail ------------------
Sender:Angelo G. <[hidden email]>
Send Date:Wed May 19 00:24:42 2021
Recipients:Flink User Mail List <[hidden email]>
Subject:Issue reading from S3
Hi,

I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting the job to local cluster (tar.gz distribution). I do not have a Hadoop installation running in the same machine. S3 (not Amazon) is running in a remote location and I have access to it via endpoint and access/secret keys.

The issue is that I'm able to read and write from and to S3 when using StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText methods but I can't read from S3 when using the table API.

This is the application:

package org.apache.flink;import org.apache.flink.core.fs.FileSystem;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;public class ReadTables {    public static void main(String[] args) throws Exception {        // CLASSIC API (PLAIN TEXT)        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStream<String> ds = env.readTextFile("s3a://bucket/source.txt");        ds.writeAsText("s3a://bucket/dest.txt", FileSystem.WriteMode.OVERWRITE);        env.execute();        // TABLE API        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();        TableEnvironment t_env = TableEnvironment.create(settings);        t_env.getConfig().getConfiguration().setString("parallelism.default", "1");              t_env.executeSql("CREATE TABLE example (  `date` STRING,  `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://bucket/xxx/yyy/', 'format' = 'parquet')");        Table t1 = t_env.from("example");        t1.execute().print();    }}

The first job works properly, reading the source.txt file and writing it to dest.txt.

The second job does not work:

$~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c org.apache.flink.ReadTables flink-s3-1.0-SNAPSHOT.jar;

Job has been submitted with JobID c690f2222aed0051d1501d5b9747b56f
Program execution finished
Job with JobID c690f2222aed0051d1501d5b9747b56f has finished.
Job Runtime: 17358 ms

Job has been submitted with JobID ebe54017faa83af33923d50892283e11
+--------------------------------+-------------+
|                           date |       value |
+--------------------------------+-------------+

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to fetch next result
Caused by: java.lang.RuntimeException: Failed to fetch next result
Caused by: java.io.IOException: Failed to fetch job execution result
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
Caused by: java.net.SocketTimeoutException: doesBucketExist on scib-des-cm-fipoac-medusa: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
Caused by: com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
Caused by: java.net.SocketTimeoutException: connect timed out

I have the access credentials configured in flink-conf.yaml file:

s3a.endpoint: http://s3.xxxxxxx
s3a.path-style: true
s3a.access-key: xxxxxxxxx
s3a.secret-key: xxxxxxxxx
s3a.entropy.key: _entropy_
s3a.entropy.length: 4
s3a.region: s3
s3a.bucket: xxxxxxxxx


I copied the flink-s3-fs-hadoop jar in the plugins folder but I had to add it as a dependency (not provided) to the pom, otherwise a S3AFileSystem 'class not found' exception arises.

Thank you for your help,

Angelo.