Hi,
I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting the job to local cluster (tar.gz distribution). I do not have a Hadoop installation running in the same machine. S3 (not Amazon) is running in a remote location and I have access to it via endpoint and access/secret keys. The issue is that I'm able to read and write from and to S3 when using StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText methods but I can't read from S3 when using the table API. This is the application: package org.apache.flink; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; public class ReadTables { public static void main(String[] args) throws Exception { // CLASSIC API (PLAIN TEXT) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> ds = env.readTextFile("s3a://bucket/source.txt"); ds.writeAsText("s3a://bucket/dest.txt", FileSystem.WriteMode.OVERWRITE); env.execute(); // TABLE API EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build(); TableEnvironment t_env = TableEnvironment.create(settings); t_env.getConfig().getConfiguration().setString("parallelism.default", "1"); t_env.executeSql("CREATE TABLE example ( `date` STRING, `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://bucket/xxx/yyy/', 'format' = 'parquet')"); Table t1 = t_env.from("example"); t1.execute().print(); } } The first job works properly, reading the source.txt file and writing it to dest.txt. The second job does not work: $~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c org.apache.flink.ReadTables flink-s3-1.0-SNAPSHOT.jar; Job has been submitted with JobID c690f2222aed0051d1501d5b9747b56f Program execution finished Job with JobID c690f2222aed0051d1501d5b9747b56f has finished. Job Runtime: 17358 ms Job has been submitted with JobID ebe54017faa83af33923d50892283e11 +--------------------------------+-------------+ | date | value | +--------------------------------+-------------+ ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to fetch next result Caused by: java.lang.RuntimeException: Failed to fetch next result Caused by: java.io.IOException: Failed to fetch job execution result Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11) Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records Caused by: java.net.SocketTimeoutException: doesBucketExist on scib-des-cm-fipoac-medusa: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: Caused by: com.amazonaws.SdkClientException: Failed to connect to service endpoint: Caused by: java.net.SocketTimeoutException: connect timed out I have the access credentials configured in flink-conf.yaml file: s3a.endpoint: http://s3.xxxxxxx s3a.path-style: true s3a.access-key: xxxxxxxxx s3a.secret-key: xxxxxxxxx s3a.entropy.key: _entropy_ s3a.entropy.length: 4 s3a.region: s3 s3a.bucket: xxxxxxxxx I copied the flink-s3-fs-hadoop jar in the plugins folder but I had to add it as a dependency (not provided) to the pom, otherwise a S3AFileSystem 'class not found' exception arises. Thank you for your help, Angelo. |
Hi Angelo, I tried the fail case provied with a similar one: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); However, it seems the job could be executed successfully. I further tried with the configuration, and found that the exception is thrown if there is no s3a.access-key or s3a.secret-key configured. Could you have a look at if the two configuration items are effective ? Also I only configured the s3a.path-style: true, s3a.access-key and s3a.secret-key, is it possible to remove the other configuration items and have a try ? Best, Yun
|
Hi Yun Gao, Thank you for your prompt response. I've changed the table 'format' from 'parquet' to 'raw' as in your example and I've been able to access the file: Job has been submitted with JobID 441e7518bb615109624c1f33f222475b +--------------------------------+ | url | +--------------------------------+ |AR1 ���� | 2021-05-16 ���� ... | | 2021-05-16 | | 2021-05-16 ... | So it seems the problem is linked with the parquet dependencies. This is my POM: <!-- Another interesting fact is when exporting the access and secret key as env variables and adding: fs.s3a.aws.credentials.provider: com.amazonaws.auth.EnvironmentVariableCredentialsProvider to flink-conf.yaml I'm able to read and decode the parquet file properly: Job has been submitted with JobID 15453b8be2bddcb49c1141a01013bf81 +--------------------------------+-------------+ | date | value | +--------------------------------+-------------+ | 2021-05-16 | 7 | | 2021-05-16 | 8 | | 2021-05-16 | 8 | | 2021-05-16 | 8 | | 2021-05-16 | 1 | | 2021-05-16 | 3 | | 2021-05-16 | 9 | | 2021-05-16 | 9 | | 2021-05-16 | 8 | | 2021-05-16 | 7 | | 2021-05-16 | 9 We won't be able to set env variables in production, so this is not a valid workaround for us. Please have a look to the POM so you can tell a dependency miss or misuse of some sort. Thank you very much. Angelo. On Thu, May 20, 2021 at 11:54 AM Yun Gao <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |