How to stream CSV from S3?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

How to stream CSV from S3?

John Smith
Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is copied to S3?
3- Is that part of the table APIs?
Reply | Threaded
Open this post in threaded view
|

Re: How to stream CSV from S3?

Jingsong Li
Hi John,

Do you mean you want to read S3 CSV files using partition/bucket pruning?

If just using the DataSet API, you can use CsvInputFormat to read csv files.

If you want to use Table/Sql API, In 1.10, Csv format in table not support partitioned table. So the only way is specific the partition/bucket path, and read single directory.

In 1.11, the Table/Sql filesystem connector with csv format supports partitioned table, complete support partition semantics.


Best,
Jingsong

On Mon, Jul 27, 2020 at 10:54 PM John Smith <[hidden email]> wrote:
Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is copied to S3?
3- Is that part of the table APIs?


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: How to stream CSV from S3?

John Smith
Bassically I want to "monitor" a bucket on S3 and every file that gets created in that bucket read it and stream it.

If I understand correctly, I can just use env.readCsvFile() and config to continuously read a folder path?


On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <[hidden email]> wrote:
Hi John,

Do you mean you want to read S3 CSV files using partition/bucket pruning?

If just using the DataSet API, you can use CsvInputFormat to read csv files.

If you want to use Table/Sql API, In 1.10, Csv format in table not support partitioned table. So the only way is specific the partition/bucket path, and read single directory.

In 1.11, the Table/Sql filesystem connector with csv format supports partitioned table, complete support partition semantics.


Best,
Jingsong

On Mon, Jul 27, 2020 at 10:54 PM John Smith <[hidden email]> wrote:
Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is copied to S3?
3- Is that part of the table APIs?


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: How to stream CSV from S3?

Jingsong Li
Yes, you can try `StreamExecutionEnvironment.readFile(RowCsvInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)`. (And wrap it to a table if you want)

On Tue, Jul 28, 2020 at 3:46 PM John Smith <[hidden email]> wrote:
Bassically I want to "monitor" a bucket on S3 and every file that gets created in that bucket read it and stream it.

If I understand correctly, I can just use env.readCsvFile() and config to continuously read a folder path?


On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <[hidden email]> wrote:
Hi John,

Do you mean you want to read S3 CSV files using partition/bucket pruning?

If just using the DataSet API, you can use CsvInputFormat to read csv files.

If you want to use Table/Sql API, In 1.10, Csv format in table not support partitioned table. So the only way is specific the partition/bucket path, and read single directory.

In 1.11, the Table/Sql filesystem connector with csv format supports partitioned table, complete support partition semantics.


Best,
Jingsong

On Mon, Jul 27, 2020 at 10:54 PM John Smith <[hidden email]> wrote:
Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is copied to S3?
3- Is that part of the table APIs?


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: How to stream CSV from S3?

John Smith
In reply to this post by John Smith
Also this where I find the docs confusing in the "connectors" section. File system isn't under Data streaming but env.readCsvFile seems like it can do the trick?

On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <[hidden email]> wrote:
Bassically I want to "monitor" a bucket on S3 and every file that gets created in that bucket read it and stream it.

If I understand correctly, I can just use env.readCsvFile() and config to continuously read a folder path?


On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <[hidden email]> wrote:
Hi John,

Do you mean you want to read S3 CSV files using partition/bucket pruning?

If just using the DataSet API, you can use CsvInputFormat to read csv files.

If you want to use Table/Sql API, In 1.10, Csv format in table not support partitioned table. So the only way is specific the partition/bucket path, and read single directory.

In 1.11, the Table/Sql filesystem connector with csv format supports partitioned table, complete support partition semantics.


Best,
Jingsong

On Mon, Jul 27, 2020 at 10:54 PM John Smith <[hidden email]> wrote:
Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is copied to S3?
3- Is that part of the table APIs?


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: How to stream CSV from S3?

Jingsong Li
- `env.readCsvFile` is in DataSet, just read the full amount of data once in batch mode.
- `streamEnv.readFile(RowCsvInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor directory, and continue reading in streaming mode.

On Tue, Jul 28, 2020 at 3:54 PM John Smith <[hidden email]> wrote:
Also this where I find the docs confusing in the "connectors" section. File system isn't under Data streaming but env.readCsvFile seems like it can do the trick?

On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <[hidden email]> wrote:
Bassically I want to "monitor" a bucket on S3 and every file that gets created in that bucket read it and stream it.

If I understand correctly, I can just use env.readCsvFile() and config to continuously read a folder path?


On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <[hidden email]> wrote:
Hi John,

Do you mean you want to read S3 CSV files using partition/bucket pruning?

If just using the DataSet API, you can use CsvInputFormat to read csv files.

If you want to use Table/Sql API, In 1.10, Csv format in table not support partitioned table. So the only way is specific the partition/bucket path, and read single directory.

In 1.11, the Table/Sql filesystem connector with csv format supports partitioned table, complete support partition semantics.


Best,
Jingsong

On Mon, Jul 27, 2020 at 10:54 PM John Smith <[hidden email]> wrote:
Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is copied to S3?
3- Is that part of the table APIs?


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: How to stream CSV from S3?

John Smith
Hi, is there an example on how RowCsvInputFormat is initialized?

On Tue, 28 Jul 2020 at 04:00, Jingsong Li <[hidden email]> wrote:
- `env.readCsvFile` is in DataSet, just read the full amount of data once in batch mode.
- `streamEnv.readFile(RowCsvInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor directory, and continue reading in streaming mode.

On Tue, Jul 28, 2020 at 3:54 PM John Smith <[hidden email]> wrote:
Also this where I find the docs confusing in the "connectors" section. File system isn't under Data streaming but env.readCsvFile seems like it can do the trick?

On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <[hidden email]> wrote:
Bassically I want to "monitor" a bucket on S3 and every file that gets created in that bucket read it and stream it.

If I understand correctly, I can just use env.readCsvFile() and config to continuously read a folder path?


On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <[hidden email]> wrote:
Hi John,

Do you mean you want to read S3 CSV files using partition/bucket pruning?

If just using the DataSet API, you can use CsvInputFormat to read csv files.

If you want to use Table/Sql API, In 1.10, Csv format in table not support partitioned table. So the only way is specific the partition/bucket path, and read single directory.

In 1.11, the Table/Sql filesystem connector with csv format supports partitioned table, complete support partition semantics.


Best,
Jingsong

On Mon, Jul 27, 2020 at 10:54 PM John Smith <[hidden email]> wrote:
Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is copied to S3?
3- Is that part of the table APIs?


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: How to stream CSV from S3?

Arvid Heise-3
Hi John,

I found an example on SO [1] in Scala.


On Tue, Jul 28, 2020 at 4:29 PM John Smith <[hidden email]> wrote:
Hi, is there an example on how RowCsvInputFormat is initialized?

On Tue, 28 Jul 2020 at 04:00, Jingsong Li <[hidden email]> wrote:
- `env.readCsvFile` is in DataSet, just read the full amount of data once in batch mode.
- `streamEnv.readFile(RowCsvInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor directory, and continue reading in streaming mode.

On Tue, Jul 28, 2020 at 3:54 PM John Smith <[hidden email]> wrote:
Also this where I find the docs confusing in the "connectors" section. File system isn't under Data streaming but env.readCsvFile seems like it can do the trick?

On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <[hidden email]> wrote:
Bassically I want to "monitor" a bucket on S3 and every file that gets created in that bucket read it and stream it.

If I understand correctly, I can just use env.readCsvFile() and config to continuously read a folder path?


On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <[hidden email]> wrote:
Hi John,

Do you mean you want to read S3 CSV files using partition/bucket pruning?

If just using the DataSet API, you can use CsvInputFormat to read csv files.

If you want to use Table/Sql API, In 1.10, Csv format in table not support partitioned table. So the only way is specific the partition/bucket path, and read single directory.

In 1.11, the Table/Sql filesystem connector with csv format supports partitioned table, complete support partition semantics.


Best,
Jingsong

On Mon, Jul 27, 2020 at 10:54 PM John Smith <[hidden email]> wrote:
Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is copied to S3?
3- Is that part of the table APIs?


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: How to stream CSV from S3?

John Smith
Hi Yes it works :)

For the Java guys...

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String path = "file:///foo/bar";

TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};

RowCsvInputFormat csvFormat =
new RowCsvInputFormat(
new Path(path), fieldTypes);
csvFormat.setSkipFirstLineAsHeader(true);

DataStreamSource<Row> lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);

lines.map(value -> value).print();

On Thu, 30 Jul 2020 at 05:42, Arvid Heise <[hidden email]> wrote:
Hi John,

I found an example on SO [1] in Scala.


On Tue, Jul 28, 2020 at 4:29 PM John Smith <[hidden email]> wrote:
Hi, is there an example on how RowCsvInputFormat is initialized?

On Tue, 28 Jul 2020 at 04:00, Jingsong Li <[hidden email]> wrote:
- `env.readCsvFile` is in DataSet, just read the full amount of data once in batch mode.
- `streamEnv.readFile(RowCsvInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor directory, and continue reading in streaming mode.

On Tue, Jul 28, 2020 at 3:54 PM John Smith <[hidden email]> wrote:
Also this where I find the docs confusing in the "connectors" section. File system isn't under Data streaming but env.readCsvFile seems like it can do the trick?

On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <[hidden email]> wrote:
Bassically I want to "monitor" a bucket on S3 and every file that gets created in that bucket read it and stream it.

If I understand correctly, I can just use env.readCsvFile() and config to continuously read a folder path?


On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <[hidden email]> wrote:
Hi John,

Do you mean you want to read S3 CSV files using partition/bucket pruning?

If just using the DataSet API, you can use CsvInputFormat to read csv files.

If you want to use Table/Sql API, In 1.10, Csv format in table not support partitioned table. So the only way is specific the partition/bucket path, and read single directory.

In 1.11, the Table/Sql filesystem connector with csv format supports partitioned table, complete support partition semantics.


Best,
Jingsong

On Mon, Jul 27, 2020 at 10:54 PM John Smith <[hidden email]> wrote:
Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is copied to S3?
3- Is that part of the table APIs?


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng