I've asked this already on [stackoverflow][1]
Is there anything equivalent to Spark's `f.input_file_name()` ? I don't see anything that could be used in [system functions][2] I have a dataset where they embedded some information in the filenames (200k files) and I need to extract that as a new column. In Spark I could ` .withColumn("id",f.split(f.reverse(f.split(f.input_file_name(),'/'))[0],'\.')[0])` but I don't see how can I do the same with Flink. Is it possible? I don't see [any JIRA issue about it either][3]. Is it something that has already been discussed? [1]: https://stackoverflow.com/questions/64607839/is-there-an-equivalent-to-sparks-f-input-file-name-function-in-apache-flink [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html [3]: https://issues.apache.org/jira/browse/FLINK-8275?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22filename%22 -- /Rubén |
I am afraid there is no such functionality available yet.
I think though it is a valid request. I think we can use the upcoming FLIP-107 metadata columns for this purpose and expose the file name as metadata column of a filesystem source. Would you like to create a JIRA issue for it? Best, Dawid [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors On 30/10/2020 13:21, Ruben Laguna wrote: > I've asked this already on [stackoverflow][1] > > Is there anything equivalent to Spark's `f.input_file_name()` ? I > don't see anything that could be used in [system functions][2] > > I have a dataset where they embedded some information in the filenames > (200k files) and I need to extract that as a new column. > > In Spark I could ` > .withColumn("id",f.split(f.reverse(f.split(f.input_file_name(),'/'))[0],'\.')[0])` > but I don't see how can I do the same with Flink. Is it possible? > > I don't see [any JIRA issue about it either][3]. Is it something that > has already been discussed? > > > [1]: https://stackoverflow.com/questions/64607839/is-there-an-equivalent-to-sparks-f-input-file-name-function-in-apache-flink > [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html > [3]: https://issues.apache.org/jira/browse/FLINK-8275?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22filename%22 signature.asc (849 bytes) Download Attachment |
Sure, I’ll write the JIRA issue On Fri, 30 Oct 2020 at 13:27, Dawid Wysakowicz <[hidden email]> wrote: I am afraid there is no such functionality available yet. /Rubén
|
I've written [FLINK-19903][1]. I just read [FLIP-107][2] and [FLINK-15869][3] and I need to ask.... So assuming that FLIP-107 / FLINK-15869 is implemented and Filesystem SQL connector modified to expose metadata (including path, and possible other stuff) , then to use it I would need to write: CREATE TABLE table1( `text` VARCHAR, -- each CSV row is just a single text column ) WITH ( 'connector' = 'filesystem`, 'path' = 'file://Users/ecerulm/mycsvfiles/', 'format' = 'csv', 'include.metadata' = 'path,size' -- tell filesystem connector to add 2 extra columns called `flink-filesystem-metadata.path` and `flink-filesystem-metadata.size` ); Is that right? On Fri, Oct 30, 2020 at 1:29 PM Ruben Laguna <[hidden email]> wrote:
/Rubén
|
No, you would do: CREATE TABLE table1(
`text` VARCHAR, -- each CSV row
is just a single text column
`path` VARCHAR METADATA VIRTUAL,
-- where path is a name declared by the filesystem source
`size` VARCHAR METADATA VIRTUAL -- where size is a name declared by the filesystem source ) WITH (
'connector' = 'filesystem`,
'path' =
'file://Users/ecerulm/mycsvfiles/',
'format' = 'csv'
);
On 30/10/2020 14:25, Ruben Laguna
wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |