<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.2</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.1.1,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
Hi Angelo,I tried the fail case provied with a similar one:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();
TableEnvironment t_env = TableEnvironment.create(settings);
t_env.getConfig().getConfiguration().setString("parallelism.default", "1");
t_env.executeSql("CREATE TABLE example ( `url` STRING) WITH ( 'connector' = 'filesystem', 'path' = 's3a://whatnamedoyouwant/links', 'format' = 'raw')");
Table t1 = t_env.from("example");
t1.execute().print();However, it seems the job could be executed successfully.I further tried with the configuration, and found that the exceptionis thrown if there is no s3a.access-key or s3a.secret-keyconfigured. Could you have a look at if the two configuration itemsare effective ?Also I only configured the s3a.path-style: true, s3a.access-key ands3a.secret-key, is it possible to remove the other configuration itemsand have a try ?Best,Yun------------------Original Mail ------------------Sender:Angelo G. <[hidden email]>Send Date:Wed May 19 00:24:42 2021Recipients:Flink User Mail List <[hidden email]>Subject:Issue reading from S3Hi,I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting the job to local cluster (tar.gz distribution). I do not have a Hadoop installation running in the same machine. S3 (not Amazon) is running in a remote location and I have access to it via endpoint and access/secret keys.The issue is that I'm able to read and write from and to S3 when using StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText methods but I can't read from S3 when using the table API.This is the application:package org.apache.flink;import org.apache.flink.core.fs.FileSystem;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;public class ReadTables { public static void main(String[] args) throws Exception { // CLASSIC API (PLAIN TEXT) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> ds = env.readTextFile("s3a://bucket/source.txt"); ds.writeAsText("s3a://bucket/dest.txt", FileSystem.WriteMode.OVERWRITE); env.execute(); // TABLE API EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build(); TableEnvironment t_env = TableEnvironment.create(settings); t_env.getConfig().getConfiguration().setString("parallelism.default", "1"); t_env.executeSql("CREATE TABLE example ( `date` STRING, `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://bucket/xxx/yyy/', 'format' = 'parquet')"); Table t1 = t_env.from("example"); t1.execute().print(); }}The first job works properly, reading the source.txt file and writing it to dest.txt.The second job does not work:$~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c org.apache.flink.ReadTables flink-s3-1.0-SNAPSHOT.jar;
Job has been submitted with JobID c690f2222aed0051d1501d5b9747b56f
Program execution finished
Job with JobID c690f2222aed0051d1501d5b9747b56f has finished.
Job Runtime: 17358 ms
Job has been submitted with JobID ebe54017faa83af33923d50892283e11
+--------------------------------+-------------+
| date | value |
+--------------------------------+-------------+
------------------------------------------------------------
The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to fetch next resultCaused by: java.lang.RuntimeException: Failed to fetch next resultCaused by: java.io.IOException: Failed to fetch job execution resultCaused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11)Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11)Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategyCaused by: java.lang.RuntimeException: One or more fetchers have encountered exceptionCaused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the recordsCaused by: java.net.SocketTimeoutException: doesBucketExist on scib-des-cm-fipoac-medusa: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint:Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint:Caused by: com.amazonaws.SdkClientException: Failed to connect to service endpoint:Caused by: java.net.SocketTimeoutException: connect timed outI have the access credentials configured in flink-conf.yaml file:s3a.endpoint: http://s3.xxxxxxx
s3a.path-style: true
s3a.access-key: xxxxxxxxx
s3a.secret-key: xxxxxxxxx
s3a.entropy.key: _entropy_
s3a.entropy.length: 4
s3a.region: s3
s3a.bucket: xxxxxxxxxI copied the flink-s3-fs-hadoop jar in the plugins folder but I had to add it as a dependency (not provided) to the pom, otherwise a S3AFileSystem 'class not found' exception arises.Thank you for your help,Angelo.
Free forum by Nabble | Edit this page |