Struggling with reading the file from s3 as Source

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Struggling with reading the file from s3 as Source

Vijay Balakrishnan
Hi,

I want to get data from S3 and process and send to Kinesis.
1. Get gzip files from an s3 folder(s3://bucket/prefix)
2. Sort each file
3. Do some map/processing on each record in the file
4. send to Kinesis

Idea is:
env.readTextFile(s3Folder)
.sort(SortFunction)
.map(MapFunction)
.sink(KinesisSink)

Struggling with reading the file from s3.
//Assume env is setup properly
//The endpoint can either be a single file or a directory - "s3://<bucket>/<endpoint>"
final DataStreamSource<String> stringDataStreamSource = env.readTextFile(s3Folder);
stringDataStreamSource.print();

It keeps erroring saying I need some kind of HDFS setup ??? I don't want anything to do with HDFS.
Just want to read from S3.
Saw a StackOverflow mention by David Anderson I think about using the Flink SQL API.
I would appreciate any decent example to get the reading from S3 working.

TIA,
Vijay

Reply | Threaded
Open this post in threaded view
|

Re: Struggling with reading the file from s3 as Source

rmetzger0
Hi Vijay,

Can you post the error you are referring to? 

On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

I want to get data from S3 and process and send to Kinesis.
1. Get gzip files from an s3 folder(s3://bucket/prefix)
2. Sort each file
3. Do some map/processing on each record in the file
4. send to Kinesis

Idea is:
env.readTextFile(s3Folder)
.sort(SortFunction)
.map(MapFunction)
.sink(KinesisSink)

Struggling with reading the file from s3.
//Assume env is setup properly
//The endpoint can either be a single file or a directory - "s3://<bucket>/<endpoint>"
final DataStreamSource<String> stringDataStreamSource = env.readTextFile(s3Folder);
stringDataStreamSource.print();

It keeps erroring saying I need some kind of HDFS setup ??? I don't want anything to do with HDFS.
Just want to read from S3.
Saw a StackOverflow mention by David Anderson I think about using the Flink SQL API.
I would appreciate any decent example to get the reading from S3 working.

TIA,
Vijay

Reply | Threaded
Open this post in threaded view
|

Re: Struggling with reading the file from s3 as Source

Vijay Balakrishnan
Hi Robert,
Thanks for the link.
Is there a simple example I can use as a starting template for using S3 with pom.xml ?

I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop directory
Running from flink-1.11.1/
flink run -cp ../target/monitoring-rules-influx-1.0.jar -jar /Users/vkbalakr/work/flink-examples/understanding-apache-flink/03-processing-infinite-streams-of-data/monitoring-rules-influx/target/monitoring-rules-influx-1.0.jar

Caused by: java.io.IOException: Cannot find any jar files for plugin in directory [/Users/vkbalakr/flink/flink-1.11.1 2/plugins/s3-fs-hadoop].
Please provide the jar files for the plugin or delete the directory.
at org.apache.flink.core.plugin.DirectoryBasedPluginFinder.createJarURLsFromDirectory(DirectoryBasedPluginFinder.java:97)

IDEA: (I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop directory)
How do I connect that to the pom.xml to run inside IntelliJ which points to the Apache repo??
pom.xml:
Added hadoop dependencies:
<dependencies>
        <!-- Hadoop dependencies -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
<!-- Apache Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

This gives:
Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)

TIA,

On Fri, Sep 11, 2020 at 11:09 AM Robert Metzger <[hidden email]> wrote:
Hi Vijay,

Can you post the error you are referring to? 

On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

I want to get data from S3 and process and send to Kinesis.
1. Get gzip files from an s3 folder(s3://bucket/prefix)
2. Sort each file
3. Do some map/processing on each record in the file
4. send to Kinesis

Idea is:
env.readTextFile(s3Folder)
.sort(SortFunction)
.map(MapFunction)
.sink(KinesisSink)

Struggling with reading the file from s3.
//Assume env is setup properly
//The endpoint can either be a single file or a directory - "s3://<bucket>/<endpoint>"
final DataStreamSource<String> stringDataStreamSource = env.readTextFile(s3Folder);
stringDataStreamSource.print();

It keeps erroring saying I need some kind of HDFS setup ??? I don't want anything to do with HDFS.
Just want to read from S3.
Saw a StackOverflow mention by David Anderson I think about using the Flink SQL API.
I would appreciate any decent example to get the reading from S3 working.

TIA,
Vijay

Reply | Threaded
Open this post in threaded view
|

Re: Struggling with reading the file from s3 as Source

Vijay Balakrishnan
My problem was the plugins jar needs to be under plugins/s3-fs-hadoop. Running code with
Added to flink-conf.yaml:
s3.access-key:
s3.secret-key:

Removed from pom.xml all hadoop dependencies.

cd /<flink-dir>
/bin/start-cluster.sh
./bin/flink runxyz..jar

Still struggling with how to get it work with pom.xml in IntelliJ IDEA

On Mon, Sep 14, 2020 at 12:13 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi Robert,
Thanks for the link.
Is there a simple example I can use as a starting template for using S3 with pom.xml ?

I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop directory
Running from flink-1.11.1/
flink run -cp ../target/monitoring-rules-influx-1.0.jar -jar /Users/vkbalakr/work/flink-examples/understanding-apache-flink/03-processing-infinite-streams-of-data/monitoring-rules-influx/target/monitoring-rules-influx-1.0.jar

Caused by: java.io.IOException: Cannot find any jar files for plugin in directory [/Users/vkbalakr/flink/flink-1.11.1 2/plugins/s3-fs-hadoop].
Please provide the jar files for the plugin or delete the directory.
at org.apache.flink.core.plugin.DirectoryBasedPluginFinder.createJarURLsFromDirectory(DirectoryBasedPluginFinder.java:97)

IDEA: (I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop directory)
How do I connect that to the pom.xml to run inside IntelliJ which points to the Apache repo??
pom.xml:
Added hadoop dependencies:
<dependencies>
        <!-- Hadoop dependencies -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
<!-- Apache Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

This gives:
Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)

TIA,

On Fri, Sep 11, 2020 at 11:09 AM Robert Metzger <[hidden email]> wrote:
Hi Vijay,

Can you post the error you are referring to? 

On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

I want to get data from S3 and process and send to Kinesis.
1. Get gzip files from an s3 folder(s3://bucket/prefix)
2. Sort each file
3. Do some map/processing on each record in the file
4. send to Kinesis

Idea is:
env.readTextFile(s3Folder)
.sort(SortFunction)
.map(MapFunction)
.sink(KinesisSink)

Struggling with reading the file from s3.
//Assume env is setup properly
//The endpoint can either be a single file or a directory - "s3://<bucket>/<endpoint>"
final DataStreamSource<String> stringDataStreamSource = env.readTextFile(s3Folder);
stringDataStreamSource.print();

It keeps erroring saying I need some kind of HDFS setup ??? I don't want anything to do with HDFS.
Just want to read from S3.
Saw a StackOverflow mention by David Anderson I think about using the Flink SQL API.
I would appreciate any decent example to get the reading from S3 working.

TIA,
Vijay