Hi, I want to get data from S3 and process and send to Kinesis. 1. Get gzip files from an s3 folder(s3://bucket/prefix) 2. Sort each file 3. Do some map/processing on each record in the file 4. send to Kinesis Idea is: env.readTextFile(s3Folder) .sort(SortFunction) .map(MapFunction) .sink(KinesisSink) Struggling with reading the file from s3. //Assume env is setup properly //The endpoint can either be a single file or a directory - "s3://<bucket>/<endpoint>" final DataStreamSource<String> stringDataStreamSource = env.readTextFile(s3Folder); stringDataStreamSource.print(); It keeps erroring saying I need some kind of HDFS setup ??? I don't want anything to do with HDFS. Just want to read from S3. Saw a StackOverflow mention by David Anderson I think about using the Flink SQL API. I would appreciate any decent example to get the reading from S3 working. TIA, Vijay |
Hi Vijay, Can you post the error you are referring to? Did you properly set up an s3 plugin (https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/) ? On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan <[hidden email]> wrote:
|
Hi Robert, Thanks for the link. Is there a simple example I can use as a starting template for using S3 with pom.xml ? I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop directory Running from flink-1.11.1/ flink run -cp ../target/monitoring-rules-influx-1.0.jar -jar /Users/vkbalakr/work/flink-examples/understanding-apache-flink/03-processing-infinite-streams-of-data/monitoring-rules-influx/target/monitoring-rules-influx-1.0.jar Caused by: java.io.IOException: Cannot find any jar files for plugin in directory [/Users/vkbalakr/flink/flink-1.11.1 2/plugins/s3-fs-hadoop]. Please provide the jar files for the plugin or delete the directory. at org.apache.flink.core.plugin.DirectoryBasedPluginFinder.createJarURLsFromDirectory(DirectoryBasedPluginFinder.java:97) IDEA: (I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop directory) How do I connect that to the pom.xml to run inside IntelliJ which points to the Apache repo?? pom.xml: Added hadoop dependencies: <dependencies> <!-- Hadoop dependencies --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>${hadoop.version}</version> </dependency> <!-- Apache Flink dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> This gives: Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application. at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84) TIA, On Fri, Sep 11, 2020 at 11:09 AM Robert Metzger <[hidden email]> wrote:
|
My problem was the plugins jar needs to be under plugins/s3-fs-hadoop. Running code with Added to flink-conf.yaml: s3.access-key: s3.secret-key: Removed from pom.xml all hadoop dependencies. cd /<flink-dir> /bin/start-cluster.sh ./bin/flink runxyz..jar Still struggling with how to get it work with pom.xml in IntelliJ IDEA On Mon, Sep 14, 2020 at 12:13 PM Vijay Balakrishnan <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |