Hi, using Flink 1.10
1- How do we go about reading CSV files that are copied to s3 buckets? 2- Is there a source that can tail S3 and start reading a CSV when it is copied to S3? 3- Is that part of the table APIs? |
Hi John, Do you mean you want to read S3 CSV files using partition/bucket pruning? If just using the DataSet API, you can use CsvInputFormat to read csv files. If you want to use Table/Sql API, In 1.10, Csv format in table not support partitioned table. So the only way is specific the partition/bucket path, and read single directory. In 1.11, the Table/Sql filesystem connector with csv format supports partitioned table, complete support partition semantics. [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html Best, Jingsong On Mon, Jul 27, 2020 at 10:54 PM John Smith <[hidden email]> wrote:
Best, Jingsong Lee |
Bassically I want to "monitor" a bucket on S3 and every file that gets created in that bucket read it and stream it. If I understand correctly, I can just use env.readCsvFile() and config to continuously read a folder path? On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <[hidden email]> wrote:
|
Yes, you can try `StreamExecutionEnvironment.readFile(RowCsvInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)`. (And wrap it to a table if you want) On Tue, Jul 28, 2020 at 3:46 PM John Smith <[hidden email]> wrote:
Best, Jingsong Lee |
In reply to this post by John Smith
Also this where I find the docs confusing in the "connectors" section. File system isn't under Data streaming but env.readCsvFile seems like it can do the trick? On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <[hidden email]> wrote:
|
- `env.readCsvFile` is in DataSet, just read the full amount of data once in batch mode. - `streamEnv.readFile(RowCsvInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor directory, and continue reading in streaming mode.On Tue, Jul 28, 2020 at 3:54 PM John Smith <[hidden email]> wrote:
Best, Jingsong Lee |
Hi, is there an example on how RowCsvInputFormat is initialized? On Tue, 28 Jul 2020 at 04:00, Jingsong Li <[hidden email]> wrote:
|
On Tue, Jul 28, 2020 at 4:29 PM John Smith <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Yes it works :) For the Java guys... final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); On Thu, 30 Jul 2020 at 05:42, Arvid Heise <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |