Can I get the filename as a column?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Can I get the filename as a column?

Ruben Laguna
I've asked this already on [stackoverflow][1]

Is there anything equivalent to Spark's `f.input_file_name()` ?  I
don't see anything that could be used in [system functions][2]

I have a dataset where they embedded some information in the filenames
(200k files) and I need to extract that as a new column.

In Spark I could `
.withColumn("id",f.split(f.reverse(f.split(f.input_file_name(),'/'))[0],'\.')[0])`
 but I don't see how can I do the same with Flink. Is it possible?

I don't see [any JIRA issue about it either][3]. Is it something that
has already been discussed?


[1]: https://stackoverflow.com/questions/64607839/is-there-an-equivalent-to-sparks-f-input-file-name-function-in-apache-flink
[2]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
[3]: https://issues.apache.org/jira/browse/FLINK-8275?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22filename%22
--
/Rubén
Reply | Threaded
Open this post in threaded view
|

Re: Can I get the filename as a column?

Dawid Wysakowicz-2
I am afraid there is no such functionality available yet.

I think though it is a valid request. I think we can use the upcoming
FLIP-107 metadata columns for this purpose and expose the file name as
metadata column of a filesystem source.

Would you like to create a JIRA issue for it?

Best,

Dawid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors

On 30/10/2020 13:21, Ruben Laguna wrote:

> I've asked this already on [stackoverflow][1]
>
> Is there anything equivalent to Spark's `f.input_file_name()` ?  I
> don't see anything that could be used in [system functions][2]
>
> I have a dataset where they embedded some information in the filenames
> (200k files) and I need to extract that as a new column.
>
> In Spark I could `
> .withColumn("id",f.split(f.reverse(f.split(f.input_file_name(),'/'))[0],'\.')[0])`
>  but I don't see how can I do the same with Flink. Is it possible?
>
> I don't see [any JIRA issue about it either][3]. Is it something that
> has already been discussed?
>
>
> [1]: https://stackoverflow.com/questions/64607839/is-there-an-equivalent-to-sparks-f-input-file-name-function-in-apache-flink
> [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
> [3]: https://issues.apache.org/jira/browse/FLINK-8275?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22filename%22


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Can I get the filename as a column?

Ruben Laguna
Sure, I’ll write the JIRA issue

On Fri, 30 Oct 2020 at 13:27, Dawid Wysakowicz <[hidden email]> wrote:
I am afraid there is no such functionality available yet.

I think though it is a valid request. I think we can use the upcoming
FLIP-107 metadata columns for this purpose and expose the file name as
metadata column of a filesystem source.

Would you like to create a JIRA issue for it?

Best,

Dawid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors

On 30/10/2020 13:21, Ruben Laguna wrote:
> I've asked this already on [stackoverflow][1]
>
> Is there anything equivalent to Spark's `f.input_file_name()` ?  I
> don't see anything that could be used in [system functions][2]
>
> I have a dataset where they embedded some information in the filenames
> (200k files) and I need to extract that as a new column.
>
> In Spark I could `
> .withColumn("id",f.split(f.reverse(f.split(f.input_file_name(),'/'))[0],'\.')[0])`
>  but I don't see how can I do the same with Flink. Is it possible?
>
> I don't see [any JIRA issue about it either][3]. Is it something that
> has already been discussed?
>
>
> [1]: https://stackoverflow.com/questions/64607839/is-there-an-equivalent-to-sparks-f-input-file-name-function-in-apache-flink
> [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
> [3]: https://issues.apache.org/jira/browse/FLINK-8275?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22filename%22

--
/Rubén
Reply | Threaded
Open this post in threaded view
|

Re: Can I get the filename as a column?

Ruben Laguna
I've written [FLINK-19903][1].

I just read [FLIP-107][2] and [FLINK-15869][3] and I need to ask.... 

So assuming that  FLIP-107 / FLINK-15869  is implemented and Filesystem SQL connector modified to expose metadata (including path, and possible other stuff) , then  to use it I would need to write: 

    CREATE TABLE table1(
      `text` VARCHAR,  -- each CSV row is just a single text column
    ) WITH (
      'connector' = 'filesystem`,
      'path' = 'file://Users/ecerulm/mycsvfiles/',
      'format' = 'csv',
      'include.metadata' = 'path,size' -- tell filesystem connector to add 2 extra columns called `flink-filesystem-metadata.path` and `flink-filesystem-metadata.size`
    );
      
Is that right? 


 


On Fri, Oct 30, 2020 at 1:29 PM Ruben Laguna <[hidden email]> wrote:
Sure, I’ll write the JIRA issue

On Fri, 30 Oct 2020 at 13:27, Dawid Wysakowicz <[hidden email]> wrote:
I am afraid there is no such functionality available yet.

I think though it is a valid request. I think we can use the upcoming
FLIP-107 metadata columns for this purpose and expose the file name as
metadata column of a filesystem source.

Would you like to create a JIRA issue for it?

Best,

Dawid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors

On 30/10/2020 13:21, Ruben Laguna wrote:
> I've asked this already on [stackoverflow][1]
>
> Is there anything equivalent to Spark's `f.input_file_name()` ?  I
> don't see anything that could be used in [system functions][2]
>
> I have a dataset where they embedded some information in the filenames
> (200k files) and I need to extract that as a new column.
>
> In Spark I could `
> .withColumn("id",f.split(f.reverse(f.split(f.input_file_name(),'/'))[0],'\.')[0])`
>  but I don't see how can I do the same with Flink. Is it possible?
>
> I don't see [any JIRA issue about it either][3]. Is it something that
> has already been discussed?
>
>
> [1]: https://stackoverflow.com/questions/64607839/is-there-an-equivalent-to-sparks-f-input-file-name-function-in-apache-flink
> [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
> [3]: https://issues.apache.org/jira/browse/FLINK-8275?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22filename%22

--
/Rubén


--
/Rubén
Reply | Threaded
Open this post in threaded view
|

Re: Can I get the filename as a column?

Dawid Wysakowicz-2

No, you would do:

    CREATE TABLE table1(
      `text` VARCHAR,  -- each CSV row is just a single text column
      `path` VARCHAR METADATA VIRTUAL, -- where path is a name declared by the filesystem source
      `size` VARCHAR METADATA VIRTUAL -- where size is a name declared by the filesystem source
    ) WITH (
      'connector' = 'filesystem`,
      'format' = 'csv'
    );

On 30/10/2020 14:25, Ruben Laguna wrote:
I've written [FLINK-19903][1].

I just read [FLIP-107][2] and [FLINK-15869][3] and I need to ask.... 

So assuming that  FLIP-107 / FLINK-15869  is implemented and Filesystem SQL connector modified to expose metadata (including path, and possible other stuff) , then  to use it I would need to write: 

    CREATE TABLE table1(
      `text` VARCHAR,  -- each CSV row is just a single text column
    ) WITH (
      'connector' = 'filesystem`,
      'format' = 'csv',
      'include.metadata' = 'path,size' -- tell filesystem connector to add 2 extra columns called `flink-filesystem-metadata.path` and `flink-filesystem-metadata.size`
    );
      
Is that right? 


 


On Fri, Oct 30, 2020 at 1:29 PM Ruben Laguna <[hidden email]> wrote:
Sure, I’ll write the JIRA issue

On Fri, 30 Oct 2020 at 13:27, Dawid Wysakowicz <[hidden email]> wrote:
I am afraid there is no such functionality available yet.

I think though it is a valid request. I think we can use the upcoming
FLIP-107 metadata columns for this purpose and expose the file name as
metadata column of a filesystem source.

Would you like to create a JIRA issue for it?

Best,

Dawid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors

On 30/10/2020 13:21, Ruben Laguna wrote:
> I've asked this already on [stackoverflow][1]
>
> Is there anything equivalent to Spark's `f.input_file_name()` ?  I
> don't see anything that could be used in [system functions][2]
>
> I have a dataset where they embedded some information in the filenames
> (200k files) and I need to extract that as a new column.
>
> In Spark I could `
> .withColumn("id",f.split(f.reverse(f.split(f.input_file_name(),'/'))[0],'\.')[0])`
>  but I don't see how can I do the same with Flink. Is it possible?
>
> I don't see [any JIRA issue about it either][3]. Is it something that
> has already been discussed?
>
>
> [1]: https://stackoverflow.com/questions/64607839/is-there-an-equivalent-to-sparks-f-input-file-name-function-in-apache-flink
> [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
> [3]: https://issues.apache.org/jira/browse/FLINK-8275?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22filename%22

--
/Rubén


--
/Rubén

signature.asc (849 bytes) Download Attachment