Filter columns of a csv file with Flink

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Filter columns of a csv file with Flink

françois lacombe
Hi all,

I'm a new user to Flink community. This tool sounds great to achieve some data loading of millions-rows files into a pgsql db for a new project.

As I read docs and examples, a proper use case of csv loading into pgsql can't be found.
The file I want to load isn't following the same structure than the table, I have to delete some columns and make a json string from several others too prior to load to pgsql

I plan to use Flink 1.5 Java API and a batch process.
Does the DataSet class is able to strip some columns out of the records I load or should I iterate over each record to delete the columns?

Same question to make a json string from several columns of the same record?
E.g json_column =3D {"field1":col1, "field2":col2...}

I work with 20 millions length files and it sounds pretty ineffective to iterate over each records.
Can someone tell me if it's possible or if I have to change my mind about this?


Thanks in advance, all the best

François Lacombe

Reply | Threaded
Open this post in threaded view
|

Re: Filter columns of a csv file with Flink

Hequn Cheng
Hi francois,

If I understand correctly, you can use sql or table-api to solve you problem. 
As you want to project part of columns from source, a columnar storage like parquet/orc would be efficient. Currently, ORC table source is supported in flink, you can find more details here[1]. Also, there are many other table sources[2] you can choose. With a TableSource, you can read the data and register it as a Table and do table operations through sql[3] or table-api[4].

To make a json string from several columns, you can write a user defined function[5].

I also find a OrcTableSourceITCase[6] which I think may be helpful for you. 

Best, Hequn



On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <[hidden email]> wrote:
Hi all,

I'm a new user to Flink community. This tool sounds great to achieve some data loading of millions-rows files into a pgsql db for a new project.

As I read docs and examples, a proper use case of csv loading into pgsql can't be found.
The file I want to load isn't following the same structure than the table, I have to delete some columns and make a json string from several others too prior to load to pgsql

I plan to use Flink 1.5 Java API and a batch process.
Does the DataSet class is able to strip some columns out of the records I load or should I iterate over each record to delete the columns?

Same question to make a json string from several columns of the same record?
E.g json_column =3D {"field1":col1, "field2":col2...}

I work with 20 millions length files and it sounds pretty ineffective to iterate over each records.
Can someone tell me if it's possible or if I have to change my mind about this?


Thanks in advance, all the best

François Lacombe


Reply | Threaded
Open this post in threaded view
|

Re: Filter columns of a csv file with Flink

françois lacombe
Hi Hequn,

The Table-API is really great.
I will use and certainly love it to solve the issues I mentioned before

One subsequent question regarding Table-API :
I've got my csv files and avro schemas that describe them.
As my users can send erroneous files, inconsistent with schemas, I want to check if files structure is right before processing them.
I see that CsvTableSource allows to define csv fields. Then, will it check if columns actually exists in the file and throw Exception if not ?

Or is there any other way in Apache Avro to check if a csv file is consistent with a given schema?

Big thank to put on the table-api's way :)

Best R

François Lacombe



2018-07-06 16:53 GMT+02:00 Hequn Cheng <[hidden email]>:
Hi francois,

If I understand correctly, you can use sql or table-api to solve you problem. 
As you want to project part of columns from source, a columnar storage like parquet/orc would be efficient. Currently, ORC table source is supported in flink, you can find more details here[1]. Also, there are many other table sources[2] you can choose. With a TableSource, you can read the data and register it as a Table and do table operations through sql[3] or table-api[4].

To make a json string from several columns, you can write a user defined function[5].

I also find a OrcTableSourceITCase[6] which I think may be helpful for you. 

Best, Hequn



On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <[hidden email]> wrote:
Hi all,

I'm a new user to Flink community. This tool sounds great to achieve some data loading of millions-rows files into a pgsql db for a new project.

As I read docs and examples, a proper use case of csv loading into pgsql can't be found.
The file I want to load isn't following the same structure than the table, I have to delete some columns and make a json string from several others too prior to load to pgsql

I plan to use Flink 1.5 Java API and a batch process.
Does the DataSet class is able to strip some columns out of the records I load or should I iterate over each record to delete the columns?

Same question to make a json string from several columns of the same record?
E.g json_column =3D {"field1":col1, "field2":col2...}

I work with 20 millions length files and it sounds pretty ineffective to iterate over each records.
Can someone tell me if it's possible or if I have to change my mind about this?


Thanks in advance, all the best

François Lacombe



Reply | Threaded
Open this post in threaded view
|

Re: Filter columns of a csv file with Flink

Hequn Cheng
Hi francois,

I see that CsvTableSource allows to define csv fields. Then, will it check if columns actually exists in the file and throw Exception if not ?
Currently, CsvTableSource doesn't support Avro. CsvTableSource uses fieldDelim and rowDelim to parse data. But there is a workaround: read each line from data as a single big column, i.e., the source table only has one column. Afterward, you can use udtf[1] to split each line. You can throw away data or throw exceptions in udtf as you wish. 

 I want to check if files structure is right before processing them.
If you want to skip the whole file when the schema is erroneous. You can write a user defined table source and probably have to write a user defined InputFormat. You can refer to the AvroInputFormat[2] as an example.


On Fri, Jul 6, 2018 at 11:32 PM, françois lacombe <[hidden email]> wrote:
Hi Hequn,

The Table-API is really great.
I will use and certainly love it to solve the issues I mentioned before

One subsequent question regarding Table-API :
I've got my csv files and avro schemas that describe them.
As my users can send erroneous files, inconsistent with schemas, I want to check if files structure is right before processing them.
I see that CsvTableSource allows to define csv fields. Then, will it check if columns actually exists in the file and throw Exception if not ?

Or is there any other way in Apache Avro to check if a csv file is consistent with a given schema?

Big thank to put on the table-api's way :)

Best R

François Lacombe



2018-07-06 16:53 GMT+02:00 Hequn Cheng <[hidden email]>:
Hi francois,

If I understand correctly, you can use sql or table-api to solve you problem. 
As you want to project part of columns from source, a columnar storage like parquet/orc would be efficient. Currently, ORC table source is supported in flink, you can find more details here[1]. Also, there are many other table sources[2] you can choose. With a TableSource, you can read the data and register it as a Table and do table operations through sql[3] or table-api[4].

To make a json string from several columns, you can write a user defined function[5].

I also find a OrcTableSourceITCase[6] which I think may be helpful for you. 

Best, Hequn



On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <[hidden email]> wrote:
Hi all,

I'm a new user to Flink community. This tool sounds great to achieve some data loading of millions-rows files into a pgsql db for a new project.

As I read docs and examples, a proper use case of csv loading into pgsql can't be found.
The file I want to load isn't following the same structure than the table, I have to delete some columns and make a json string from several others too prior to load to pgsql

I plan to use Flink 1.5 Java API and a batch process.
Does the DataSet class is able to strip some columns out of the records I load or should I iterate over each record to delete the columns?

Same question to make a json string from several columns of the same record?
E.g json_column =3D {"field1":col1, "field2":col2...}

I work with 20 millions length files and it sounds pretty ineffective to iterate over each records.
Can someone tell me if it's possible or if I have to change my mind about this?


Thanks in advance, all the best

François Lacombe




Reply | Threaded
Open this post in threaded view
|

Re: Filter columns of a csv file with Flink

françois lacombe
Hi Hequn,

As CsvTableSource sounds to be optimized for csv parsing I won't question it too much.

Your second point sounds really better.
I can extend the CsvTableSource with extra Avro schema conflating capabilities. Then if the csv file header doesn't match the avro schema specification, then it throws an exception prior to parse the whole csv
Right ?

I plan to check the quality of data in two independent steps :
1 check the "file structure" with number of columns and their names, mainly dealing with header row. Any error leads to whole file rejection with a java exception
2 check each row with udfs to get ones that aren't consistent with avro schema. Any error is logged, but the rest of the file is processed and loaded.

I guess that first step doesn't require AvroInputFormat but a simple avro's Schema object and the second would be more efficient with an AvroInputFormat. Am I right ?

Thanks for useful inputs, all the best

François


2018-07-07 4:20 GMT+02:00 Hequn Cheng <[hidden email]>:
Hi francois,

I see that CsvTableSource allows to define csv fields. Then, will it check if columns actually exists in the file and throw Exception if not ?
Currently, CsvTableSource doesn't support Avro. CsvTableSource uses fieldDelim and rowDelim to parse data. But there is a workaround: read each line from data as a single big column, i.e., the source table only has one column. Afterward, you can use udtf[1] to split each line. You can throw away data or throw exceptions in udtf as you wish. 

 I want to check if files structure is right before processing them.
If you want to skip the whole file when the schema is erroneous. You can write a user defined table source and probably have to write a user defined InputFormat. You can refer to the AvroInputFormat[2] as an example.


On Fri, Jul 6, 2018 at 11:32 PM, françois lacombe <[hidden email]> wrote:
Hi Hequn,

The Table-API is really great.
I will use and certainly love it to solve the issues I mentioned before

One subsequent question regarding Table-API :
I've got my csv files and avro schemas that describe them.
As my users can send erroneous files, inconsistent with schemas, I want to check if files structure is right before processing them.
I see that CsvTableSource allows to define csv fields. Then, will it check if columns actually exists in the file and throw Exception if not ?

Or is there any other way in Apache Avro to check if a csv file is consistent with a given schema?

Big thank to put on the table-api's way :)

Best R

François Lacombe



2018-07-06 16:53 GMT+02:00 Hequn Cheng <[hidden email]>:
Hi francois,

If I understand correctly, you can use sql or table-api to solve you problem. 
As you want to project part of columns from source, a columnar storage like parquet/orc would be efficient. Currently, ORC table source is supported in flink, you can find more details here[1]. Also, there are many other table sources[2] you can choose. With a TableSource, you can read the data and register it as a Table and do table operations through sql[3] or table-api[4].

To make a json string from several columns, you can write a user defined function[5].

I also find a OrcTableSourceITCase[6] which I think may be helpful for you. 

Best, Hequn



On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <[hidden email]> wrote:
Hi all,

I'm a new user to Flink community. This tool sounds great to achieve some data loading of millions-rows files into a pgsql db for a new project.

As I read docs and examples, a proper use case of csv loading into pgsql can't be found.
The file I want to load isn't following the same structure than the table, I have to delete some columns and make a json string from several others too prior to load to pgsql

I plan to use Flink 1.5 Java API and a batch process.
Does the DataSet class is able to strip some columns out of the records I load or should I iterate over each record to delete the columns?

Same question to make a json string from several columns of the same record?
E.g json_column =3D {"field1":col1, "field2":col2...}

I work with 20 millions length files and it sounds pretty ineffective to iterate over each records.
Can someone tell me if it's possible or if I have to change my mind about this?


Thanks in advance, all the best

François Lacombe





Reply | Threaded
Open this post in threaded view
|

Re: Filter columns of a csv file with Flink

Hequn Cheng
Hi francois,

I guess that first step doesn't require AvroInputFormat
The first step requires an AvroInputFormat because the source needs AvroInputFormat to read avro data if data match schema. 

the second would be more efficient with an AvroInputFormat
I think the second step doesn't need an AvroInputFormat, because we use the udtf to parse data.

I think the first step is more efficient if you want to skip the whole file when the schema is erroneous, while the second step needs to read each line of the erroneous file. The cost of the first step is we have to implement an user defined AvroInputFormat and make it able to skip the whole file.

Best, Hequn

On Mon, Jul 9, 2018 at 4:54 PM, françois lacombe <[hidden email]> wrote:
Hi Hequn,

As CsvTableSource sounds to be optimized for csv parsing I won't question it too much.

Your second point sounds really better.
I can extend the CsvTableSource with extra Avro schema conflating capabilities. Then if the csv file header doesn't match the avro schema specification, then it throws an exception prior to parse the whole csv
Right ?

I plan to check the quality of data in two independent steps :
1 check the "file structure" with number of columns and their names, mainly dealing with header row. Any error leads to whole file rejection with a java exception
2 check each row with udfs to get ones that aren't consistent with avro schema. Any error is logged, but the rest of the file is processed and loaded.

I guess that first step doesn't require AvroInputFormat but a simple avro's Schema object and the second would be more efficient with an AvroInputFormat. Am I right ?

Thanks for useful inputs, all the best

François


2018-07-07 4:20 GMT+02:00 Hequn Cheng <[hidden email]>:
Hi francois,

I see that CsvTableSource allows to define csv fields. Then, will it check if columns actually exists in the file and throw Exception if not ?
Currently, CsvTableSource doesn't support Avro. CsvTableSource uses fieldDelim and rowDelim to parse data. But there is a workaround: read each line from data as a single big column, i.e., the source table only has one column. Afterward, you can use udtf[1] to split each line. You can throw away data or throw exceptions in udtf as you wish. 

 I want to check if files structure is right before processing them.
If you want to skip the whole file when the schema is erroneous. You can write a user defined table source and probably have to write a user defined InputFormat. You can refer to the AvroInputFormat[2] as an example.


On Fri, Jul 6, 2018 at 11:32 PM, françois lacombe <[hidden email]> wrote:
Hi Hequn,

The Table-API is really great.
I will use and certainly love it to solve the issues I mentioned before

One subsequent question regarding Table-API :
I've got my csv files and avro schemas that describe them.
As my users can send erroneous files, inconsistent with schemas, I want to check if files structure is right before processing them.
I see that CsvTableSource allows to define csv fields. Then, will it check if columns actually exists in the file and throw Exception if not ?

Or is there any other way in Apache Avro to check if a csv file is consistent with a given schema?

Big thank to put on the table-api's way :)

Best R

François Lacombe



2018-07-06 16:53 GMT+02:00 Hequn Cheng <[hidden email]>:
Hi francois,

If I understand correctly, you can use sql or table-api to solve you problem. 
As you want to project part of columns from source, a columnar storage like parquet/orc would be efficient. Currently, ORC table source is supported in flink, you can find more details here[1]. Also, there are many other table sources[2] you can choose. With a TableSource, you can read the data and register it as a Table and do table operations through sql[3] or table-api[4].

To make a json string from several columns, you can write a user defined function[5].

I also find a OrcTableSourceITCase[6] which I think may be helpful for you. 

Best, Hequn



On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <[hidden email]> wrote:
Hi all,

I'm a new user to Flink community. This tool sounds great to achieve some data loading of millions-rows files into a pgsql db for a new project.

As I read docs and examples, a proper use case of csv loading into pgsql can't be found.
The file I want to load isn't following the same structure than the table, I have to delete some columns and make a json string from several others too prior to load to pgsql

I plan to use Flink 1.5 Java API and a batch process.
Does the DataSet class is able to strip some columns out of the records I load or should I iterate over each record to delete the columns?

Same question to make a json string from several columns of the same record?
E.g json_column =3D {"field1":col1, "field2":col2...}

I work with 20 millions length files and it sounds pretty ineffective to iterate over each records.
Can someone tell me if it's possible or if I have to change my mind about this?


Thanks in advance, all the best

François Lacombe






Reply | Threaded
Open this post in threaded view
|

Re: Filter columns of a csv file with Flink

françois lacombe
Hi Hequn,

2018-07-10 3:47 GMT+02:00 Hequn Cheng <[hidden email]>:
Maybe I misunderstand you. So you don't want to skip the whole file?
Yes I do
By skipping the whole file I mean "throw an Exception to stop the process and inform user that file is invalid for a given reason" and not "the process goes fully right and import 0 rows"
 
If does, then "extending CsvTableSource and provide the avro schema to the constructor without creating a custom AvroInputFormat" is ok.

Then we agree on this
Is there any plan to give avro schemas a better role in Flink in further versions?
Avro schemas are perfect to build CSVTableSource with code like

for (Schema field_nfo : sch.getTypes()){
     // Test if csv file header actually contains a field corresponding to schema
     if (!csv_headers.contains(field_nfo.getName())) {
          throw new NoSuchFieldException(field_nfo.getName());
     }

     // Declare the field in the source Builder
     src_builder.field(field_nfo.getName(), primitiveTypes.get(field_nfo.getType()));
}

All the best

François



On Mon, Jul 9, 2018 at 11:03 PM, françois lacombe <[hidden email]> wrote:
Hi Hequn,

2018-07-09 15:09 GMT+02:00 Hequn Cheng <[hidden email]>:
The first step requires an AvroInputFormat because the source needs AvroInputFormat to read avro data if data match schema. 

I don't want avro data, I just want to check if my csv file have the same fields than defined in a given avro schema.
Processing should stop if and only if I find missing columns.

A record which not match the schema (types mainly) should be rejected and logged in a dedicated file but the processing can go on.

How about extending CsvTableSource and provide the avro schema to the constructor without creating a custom AvroInputFormat?


François


Reply | Threaded
Open this post in threaded view
|

Re: Filter columns of a csv file with Flink

Hequn Cheng
Hi francois,

Is there any plan to give avro schemas a better role in Flink in further versions?
Haven't heard about avro for csv. You can open a jira for it. Maybe also contribute to flink :-)


On Tue, Jul 10, 2018 at 11:32 PM, françois lacombe <[hidden email]> wrote:
Hi Hequn,

2018-07-10 3:47 GMT+02:00 Hequn Cheng <[hidden email]>:
Maybe I misunderstand you. So you don't want to skip the whole file?
Yes I do
By skipping the whole file I mean "throw an Exception to stop the process and inform user that file is invalid for a given reason" and not "the process goes fully right and import 0 rows"
 
If does, then "extending CsvTableSource and provide the avro schema to the constructor without creating a custom AvroInputFormat" is ok.

Then we agree on this
Is there any plan to give avro schemas a better role in Flink in further versions?
Avro schemas are perfect to build CSVTableSource with code like

for (Schema field_nfo : sch.getTypes()){
     // Test if csv file header actually contains a field corresponding to schema
     if (!csv_headers.contains(field_nfo.getName())) {
          throw new NoSuchFieldException(field_nfo.getName());
     }

     // Declare the field in the source Builder
     src_builder.field(field_nfo.getName(), primitiveTypes.get(field_nfo.getType()));
}

All the best

François



On Mon, Jul 9, 2018 at 11:03 PM, françois lacombe <[hidden email]> wrote:
Hi Hequn,

2018-07-09 15:09 GMT+02:00 Hequn Cheng <[hidden email]>:
The first step requires an AvroInputFormat because the source needs AvroInputFormat to read avro data if data match schema. 

I don't want avro data, I just want to check if my csv file have the same fields than defined in a given avro schema.
Processing should stop if and only if I find missing columns.

A record which not match the schema (types mainly) should be rejected and logged in a dedicated file but the processing can go on.

How about extending CsvTableSource and provide the avro schema to the constructor without creating a custom AvroInputFormat?


François



Reply | Threaded
Open this post in threaded view
|

Re: Filter columns of a csv file with Flink

françois lacombe
Ok Hequn,

I'll open 2 Jira for this issue, and maybe propose a draft of CsvTableSource class handling avro schemas
FLINK-9813 and FLINK-9814

Thank you for your answers and best regards

François

2018-07-11 8:11 GMT+02:00 Hequn Cheng <[hidden email]>:
Hi francois,

Is there any plan to give avro schemas a better role in Flink in further versions?
Haven't heard about avro for csv. You can open a jira for it. Maybe also contribute to flink :-)


On Tue, Jul 10, 2018 at 11:32 PM, françois lacombe <[hidden email]> wrote:
Hi Hequn,

2018-07-10 3:47 GMT+02:00 Hequn Cheng <[hidden email]>:
Maybe I misunderstand you. So you don't want to skip the whole file?
Yes I do
By skipping the whole file I mean "throw an Exception to stop the process and inform user that file is invalid for a given reason" and not "the process goes fully right and import 0 rows"
 
If does, then "extending CsvTableSource and provide the avro schema to the constructor without creating a custom AvroInputFormat" is ok.

Then we agree on this
Is there any plan to give avro schemas a better role in Flink in further versions?
Avro schemas are perfect to build CSVTableSource with code like

for (Schema field_nfo : sch.getTypes()){
     // Test if csv file header actually contains a field corresponding to schema
     if (!csv_headers.contains(field_nfo.getName())) {
          throw new NoSuchFieldException(field_nfo.getName());
     }

     // Declare the field in the source Builder
     src_builder.field(field_nfo.getName(), primitiveTypes.get(field_nfo.getType()));
}

All the best

François



On Mon, Jul 9, 2018 at 11:03 PM, françois lacombe <[hidden email]> wrote:
Hi Hequn,

2018-07-09 15:09 GMT+02:00 Hequn Cheng <[hidden email]>:
The first step requires an AvroInputFormat because the source needs AvroInputFormat to read avro data if data match schema. 

I don't want avro data, I just want to check if my csv file have the same fields than defined in a given avro schema.
Processing should stop if and only if I find missing columns.

A record which not match the schema (types mainly) should be rejected and logged in a dedicated file but the processing can go on.

How about extending CsvTableSource and provide the avro schema to the constructor without creating a custom AvroInputFormat?


François