http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Issue-reading-from-S3-tp43804.html
Hi,
I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting the job to local cluster (tar.gz distribution). I do not have a Hadoop installation running in the same machine. S3 (not Amazon) is running in a remote location and I have access to it via endpoint and access/secret keys.
The issue is that I'm able to read and write from and to S3 when using StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText methods but I can't read from S3 when using the table API.
This is the application:
package org.apache.flink;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
public class ReadTables {
public static void main(String[] args) throws Exception {
// CLASSIC API (PLAIN TEXT)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> ds = env.readTextFile("s3a://bucket/source.txt");
ds.writeAsText("s3a://bucket/dest.txt", FileSystem.WriteMode.OVERWRITE);
env.execute();
// TABLE API
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();
TableEnvironment t_env = TableEnvironment.create(settings);
t_env.getConfig().getConfiguration().setString("parallelism.default", "1");
t_env.executeSql("CREATE TABLE example ( `date` STRING, `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://bucket/xxx/yyy/', 'format' = 'parquet')");
Table t1 = t_env.from("example");
t1.execute().print();
}
}
The first job works properly, reading the source.txt file and writing it to dest.txt.
The second job does not work:
$~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c org.apache.flink.ReadTables flink-s3-1.0-SNAPSHOT.jar;
Job has been submitted with JobID c690f2222aed0051d1501d5b9747b56f
Program execution finished
Job with JobID c690f2222aed0051d1501d5b9747b56f has finished.
Job Runtime: 17358 ms
Job has been submitted with JobID ebe54017faa83af33923d50892283e11
+--------------------------------+-------------+
| date | value |
+--------------------------------+-------------+
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to fetch next result
Caused by: java.lang.RuntimeException: Failed to fetch next result
Caused by: java.io.IOException: Failed to fetch job execution result
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
Caused by: java.net.SocketTimeoutException: doesBucketExist on scib-des-cm-fipoac-medusa: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint:
Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint:
Caused by: com.amazonaws.SdkClientException: Failed to connect to service endpoint:
Caused by: java.net.SocketTimeoutException: connect timed out
I have the access credentials configured in flink-conf.yaml file:
s3a.endpoint: http://s3.xxxxxxx
s3a.path-style: true
s3a.access-key: xxxxxxxxx
s3a.secret-key: xxxxxxxxx
s3a.entropy.key: _entropy_
s3a.entropy.length: 4
s3a.region: s3
s3a.bucket: xxxxxxxxx
I copied the flink-s3-fs-hadoop jar in the plugins folder but I had to add it as a dependency (not provided) to the pom, otherwise a S3AFileSystem 'class not found' exception arises.
Thank you for your help,
Angelo.