Hi all,
I'm a new user to Flink community. This tool sounds great to achieve some data loading of millions-rows files into a pgsql db for a new project. As I read docs and examples, a proper use case of csv loading into pgsql can't be found. The file I want to load isn't following the same structure than the table, I have to delete some columns and make a json string from several others too prior to load to pgsql I plan to use Flink 1.5 Java API and a batch process. Does the DataSet class is able to strip some columns out of the records I load or should I iterate over each record to delete the columns? Same question to make a json string from several columns of the same record? E.g json_column =3D {"field1":col1, "field2":col2...} I work with 20 millions length files and it sounds pretty ineffective to iterate over each records. Can someone tell me if it's possible or if I have to change my mind about this?
Thanks in advance, all the best François Lacombe |
Hi francois, If I understand correctly, you can use sql or table-api to solve you problem. As you want to project part of columns from source, a columnar storage like parquet/orc would be efficient. Currently, ORC table source is supported in flink, you can find more details here[1]. Also, there are many other table sources[2] you can choose. With a TableSource, you can read the data and register it as a Table and do table operations through sql[3] or table-api[4]. To make a json string from several columns, you can write a user defined function[5]. I also find a OrcTableSourceITCase[6] which I think may be helpful for you. Best, Hequn [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#orctablesource On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <[hidden email]> wrote:
|
Hi Hequn, The Table-API is really great. I will use and certainly love it to solve the issues I mentioned before One subsequent question regarding Table-API : I've got my csv files and avro schemas that describe them. As my users can send erroneous files, inconsistent with schemas, I want to check if files structure is right before processing them. I see that CsvTableSource allows to define csv fields. Then, will it check if columns actually exists in the file and throw Exception if not ? Or is there any other way in Apache Avro to check if a csv file is consistent with a given schema? Big thank to put on the table-api's way :) Best R François Lacombe 2018-07-06 16:53 GMT+02:00 Hequn Cheng <[hidden email]>:
|
Hi francois, > I see that CsvTableSource allows to define csv fields. Then, will it check if columns actually exists in the file and throw Exception if not ? Currently, CsvTableSource doesn't support Avro. CsvTableSource uses fieldDelim and rowDelim to parse data. But there is a workaround: read each line from data as a single big column, i.e., the source table only has one column. Afterward, you can use udtf[1] to split each line. You can throw away data or throw exceptions in udtf as you wish. > I want to check if files structure is right before processing them. If you want to skip the whole file when the schema is erroneous. You can write a user defined table source and probably have to write a user defined InputFormat. You can refer to the AvroInputFormat[2] as an example. On Fri, Jul 6, 2018 at 11:32 PM, françois lacombe <[hidden email]> wrote:
|
Hi Hequn, As CsvTableSource sounds to be optimized for csv parsing I won't question it too much. Your second point sounds really better. I can extend the CsvTableSource with extra Avro schema conflating capabilities. Then if the csv file header doesn't match the avro schema specification, then it throws an exception prior to parse the whole csv Right ? I plan to check the quality of data in two independent steps : 1 check the "file structure" with number of columns and their names, mainly dealing with header row. Any error leads to whole file rejection with a java exception 2 check each row with udfs to get ones that aren't consistent with avro schema. Any error is logged, but the rest of the file is processed and loaded. I guess that first step doesn't require AvroInputFormat but a simple avro's Schema object and the second would be more efficient with an AvroInputFormat. Am I right ? Thanks for useful inputs, all the best François 2018-07-07 4:20 GMT+02:00 Hequn Cheng <[hidden email]>:
|
Hi francois, > I guess that first step doesn't require AvroInputFormat The first step requires an AvroInputFormat because the source needs AvroInputFormat to read avro data if data match schema. > the second would be more efficient with an AvroInputFormat I think the second step doesn't need an AvroInputFormat, because we use the udtf to parse data. I think the first step is more efficient if you want to skip the whole file when the schema is erroneous, while the second step needs to read each line of the erroneous file. The cost of the first step is we have to implement an user defined AvroInputFormat and make it able to skip the whole file. Best, Hequn On Mon, Jul 9, 2018 at 4:54 PM, françois lacombe <[hidden email]> wrote:
|
Hi Hequn, 2018-07-10 3:47 GMT+02:00 Hequn Cheng <[hidden email]>:
Yes I do By skipping the whole file I mean "throw an Exception to stop the process and inform user that file is invalid for a given reason" and not "the process goes fully right and import 0 rows"
Then we agree on this Is there any plan to give avro schemas a better role in Flink in further versions? Avro schemas are perfect to build CSVTableSource with code like for (Schema field_nfo : sch.getTypes()){ // Test if csv file header actually contains a field corresponding to schema if (!csv_headers.contains(field_nfo.getName())) { throw new NoSuchFieldException(field_nfo.getName()); } // Declare the field in the source Builder src_builder.field(field_nfo.getName(), primitiveTypes.get(field_nfo.getType())); } All the best François
|
Hi francois, > Is there any plan to give avro schemas a better role in Flink in further versions? Haven't heard about avro for csv. You can open a jira for it. Maybe also contribute to flink :-) On Tue, Jul 10, 2018 at 11:32 PM, françois lacombe <[hidden email]> wrote:
|
Ok Hequn, I'll open 2 Jira for this issue, and maybe propose a draft of CsvTableSource class handling avro schemas FLINK-9813 and FLINK-9814 Thank you for your answers and best regards François 2018-07-11 8:11 GMT+02:00 Hequn Cheng <[hidden email]>:
|
Free forum by Nabble | Edit this page |