Hello, I have multiple files (file1, file2, file3) each being CSV and having different columns and data. The column headers are finite and we know their format. I would like to take them and parse them based on the column structure. I already have the parsers e.g.: file1 has columns (id, firstname, lastname) file2 has columns (id, name) file3 has columns (id, name_1, name_2, name_3, name_4) I would like to take all those files, read them, parse them and output objects to a sink as Person { id, fullName } Example files would be: file1: ------ id, firstname, lastname 33, John, Smith 55, Labe, Soni file2: ------ id, name 5, Mitr Kompi 99, Squi Masw file3: ------ id, name_1, name_2, name_3, name_4 1, Peter, Hov, Risti, Pena 2, Rii, Koni, Ques,, Expected output of my program would be: Person { 33, John Smith } Person { 55, Labe Soni } Person { 5, Mitr Kompi } Person { 99, Squi Masw } Person { 1, Peter Hov Risti Pena } Person { 2, Rii Koni Ques } What I do now is: My code (very simplified) is: env.readFile().flatMap(new MyParser()).addSink(new MySink()) The MyParser receives the rows 1 by 1 in string format. Which means that when I run with parallelism > 1 I receive data from any file and I cannot say this line comes from where. What I would like to do is: Be able to figure out which is the file I am reading from. Since I only know the file type based on the first row (columns) I need to either send the 1st row to MyParser() or send a tuple <1st row of file being read, current row of file being read>. Another option that I can think about is to have some keyed function based on the first row, but I am not sure how to achieve that by using readFile. Is there a way I can achieve this? Regards ,Nikola |
Hi Nikola, you could implement a custom SourceFunction that implements this in some way: If the files are small (< 10 MB) send each file as a record, then process it in a subsequent flatMap operation. If the files are large, split the work across the parallel sources and read them serially in the SourceFunction. The other option (which I have not fully thought through is using the readFile method with a custom FileInputFormat implementation: DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath) You basically have to overwrite the "FileInputFormat.createInputSplits()" method to get the CSV schema, and pass it along to the splits so that they can properly parse the data.
This approach is a little bit more involved to understand, but the Flink Framework will do the heavy lifting of the file system handling / splitting / fault tolerance stuff. Best, Robert On Thu, May 28, 2020 at 4:52 PM Nikola Hrusov <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |