Table SQL Filesystem CSV recursive directory traversal

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Table SQL Filesystem CSV recursive directory traversal

Ruben Laguna
Is it possible? 

For Dataset I've found [1] :

parameters.setBoolean("recursive.file.enumeration", true);

// pass the configuration to the data source
DataSet<String> logs = env.readTextFile("file:///path/with.nested/files")
			  .withParameters(parameters);

But can I achieve something similar with the Table SQL? 

I have the following directory structure
/myfiles/20201010/00/00restoffilename1.csv
/myfiles/20201010/00/00restoffilename2.csv
...
/myfiles/20201010/00/00restoffilename3000.csv
/myfiles/20201010/01/01restoffilename1.csv
....
/myfiles/20201010/00/00restoffilename3000.csv

So for each day I have 255  subdirectories from 00 to  FF and each of those directories can have 1000-3000 files and I would like to load all those files in one go. 

Reply | Threaded
Open this post in threaded view
|

Re: Table SQL Filesystem CSV recursive directory traversal

Timo Walther
Hi Ruben,

by looking at the code, it seems you should be able to do that. At least
for batch workloads we are using
org.apache.flink.formats.csv.CsvFileSystemFormatFactory.CsvInputFormat
which is a FileInputFormat that supports the mentioned configuration option.

The problem is that this might not have been exposed via SQL properties
yet. So you would need to write your own property-to-InputFormat factory
that does it similar to:

https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileSystemFormatFactory.java

What you could do create your own factory and extend from the above so
you can set additional properties. Not a nice solution but a workaround
for now.

More information to how to write your own factory can also be found here:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html

I hope this helps.

Regards,
Timo

On 09.11.20 09:27, Ruben Laguna wrote:

> Is it possible?
>
> For Dataset I've found [1] :
>
> |parameters.setBoolean("recursive.file.enumeration", true); // pass the
> configuration to the data source DataSet<String> logs =
> env.readTextFile("file:///path/with.nested/files")
> .withParameters(parameters);|
>
>
> But can I achieve something similar with the Table SQL?
>
> I have the following directory structure
> /myfiles/20201010/00/00restoffilename1.csv
> /myfiles/20201010/00/00restoffilename2.csv
> ...
> /myfiles/20201010/00/00restoffilename3000.csv
> /myfiles/20201010/01/01restoffilename1.csv
> ....
> /myfiles/20201010/00/00restoffilename3000.csv
>
> So for each day I have 255  subdirectories from 00 to  FF and each of
> those directories can have 1000-3000 files and I would like to load all
> those files in one go.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#recursive-traversal-of-the-input-path-directory 
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#recursive-traversal-of-the-input-path-directory>
>
> --
> /Rubén

Reply | Threaded
Open this post in threaded view
|

Re: Table SQL Filesystem CSV recursive directory traversal

Danny Chan-2
In the current master code base, all the FileInputFormat default add the files recursively with the given paths. (e.g. the #addFilesInDir method).

So it should be supported as default for SQL.

Timo Walther <[hidden email]> 于2020年11月9日周一 下午11:25写道:
Hi Ruben,

by looking at the code, it seems you should be able to do that. At least
for batch workloads we are using
org.apache.flink.formats.csv.CsvFileSystemFormatFactory.CsvInputFormat
which is a FileInputFormat that supports the mentioned configuration option.

The problem is that this might not have been exposed via SQL properties
yet. So you would need to write your own property-to-InputFormat factory
that does it similar to:

https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileSystemFormatFactory.java

What you could do create your own factory and extend from the above so
you can set additional properties. Not a nice solution but a workaround
for now.

More information to how to write your own factory can also be found here:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html

I hope this helps.

Regards,
Timo

On 09.11.20 09:27, Ruben Laguna wrote:
> Is it possible?
>
> For Dataset I've found [1] :
>
> |parameters.setBoolean("recursive.file.enumeration", true); // pass the
> configuration to the data source DataSet<String> logs =
> env.readTextFile("file:///path/with.nested/files")
> .withParameters(parameters);|
>
>
> But can I achieve something similar with the Table SQL?
>
> I have the following directory structure
> /myfiles/20201010/00/00restoffilename1.csv
> /myfiles/20201010/00/00restoffilename2.csv
> ...
> /myfiles/20201010/00/00restoffilename3000.csv
> /myfiles/20201010/01/01restoffilename1.csv
> ....
> /myfiles/20201010/00/00restoffilename3000.csv
>
> So for each day I have 255  subdirectories from 00 to  FF and each of
> those directories can have 1000-3000 files and I would like to load all
> those files in one go.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#recursive-traversal-of-the-input-path-directory
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#recursive-traversal-of-the-input-path-directory>
>
> --
> /Rubén