Hi Deep, You can try to set the line delimiter and field delimiter of the RowCsvInputFormat to a non-printing character (assume there is no non-printing characters in the csv files). It will read all the content of a csv file into one Row. e.g. final StreamExecutionEnvironment env = Then you can convert the content of the csv files to json manually. Best, Wei
|
Hi Deep, Cheers, Till On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <[hidden email]> wrote:
|
Hi Deep,
Could you show your current code snippet? I have tried the Csv file data on my local machine and it works fine, so I guess what might be wrong elsewhere. Best, Wei > 在 2020年12月8日,03:20,DEEP NARAYAN Singh <[hidden email]> 写道: > > Hi Wei and Till, > Thanks for the quick reply. > > @Wei, I tried with code which you have suggested and it is working fine but I have one use case where it is failing, below is the csv input data format : > Csv file data format : > ------------------------------- > field_id,data, > A,1 > B,3 > C,4 > D,9 > E,0,0,0,0 > > because of last row which contains more that two value, and its is throwing org.apache.flink.api.common.io.ParseException: Row too short: field_id,data, > > How to handle the above corner case.Could you please suggest some way to handle this. > > @Till, Could you please elaborate more which you are suggesting? As per my use case I am dealing with multiple csv files under the given folder and reading line by line using TextInputFormat and transform will not work by using map operator. Correct me if i'm wrong . > > Thanks & Regards, > -Deep > > > On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann <[hidden email]> wrote: > Hi Deep, > > Could you use the TextInputFormat which reads a file line by line? That way > you can do the JSON parsing as part of a mapper which consumes the file > lines. > > Cheers, > Till > > On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <[hidden email]> wrote: > > > Hi Deep, > > > > (redirecting this to user mailing list as this is not a dev question) > > > > You can try to set the line delimiter and field delimiter of the > > RowCsvInputFormat to a non-printing character (assume there is no non-printing > > characters in the csv files). It will read all the content of a csv file > > into one Row. e.g. > > > > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > String path = "test"; > > TypeInformation[] fieldTypes = new TypeInformation[]{ > > BasicTypeInfo.STRING_TYPE_INFO}; > > RowCsvInputFormat csvFormat = > > new RowCsvInputFormat(new Path(path), fieldTypes); > > csvFormat.setNestedFileEnumeration(true); > > csvFormat.setDelimiter((char) 0); > > csvFormat.setFieldDelimiter(String.valueOf((char) 0)); > > DataStream<Row> > > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, > > -1);lines.map(value -> value).print(); > > env.execute(); > > > > > > Then you can convert the content of the csv files to json manually. > > > > Best, > > Wei > > > > > > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <[hidden email]> 写道: > > > > Hi Guys, > > > > Below is my code snippet , which read all csv files under the given folder > > row by row but my requirement is to read csv file at a time and convert as > > json which will looks like : > > {"A":"1","B":"3","C":"4","D":9} > > > > Csv file data format : > > ------------------------------- > > *field_id,data,* > > > > > > > > *A,1B,3C,4D,9* > > > > Code snippet: > > -------------------------- > > > > > > > > > > > > > > > > > > > > > > > > > > > > *final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment();String path = > > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[] > > fieldTypes = new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO, > > BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat = new > > RowCsvInputFormat( new Path(path), > > > > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row> > > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, > > -1);lines.map(value -> value).print();* > > > > > > Any help is highly appreciated. > > > > Thanks, > > -Deep > > > > > > |
Hi Deep,
It seems that the TypeInformation array in your code has 2 elements, but we only need one here. This approach treats the entire csv file as a Row which has only a one column, so there should be only one `BasicTypeInfo.STRING_TYPE_INFO` in the array. And if you use the TextInputFormat instead of the RowCsvInputFormat, this problem can also be solved. If you have created your own InputFormat via extending the RowCsvInputFormat, you can get the current file path via `this.currentSplit.getPath()` in your class. Note that if you choose to fill the file path into the second column of the Row, you do not need to make the above changes, because at this time we really need the TypeInformation array to contain two StringTypeInfo elements. Best, Wei
|
Hi Deep,
You can try to change the `FileProcessingMode.PROCESS_ONCE` to `FileProcessingMode.PROCESS_CONTINUOUSLY`. Best, Wei
|
Free forum by Nabble | Edit this page |