DataStream API: Best way for reading csv file

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

DataStream API: Best way for reading csv file

Jan Oelschlegel

 

Hi ,

 

i’m looking for an comfortable way to read a CSV file with the DataStream API in Flink 1.11.

 

Without using the Table/SQL-API before.

 

This is my first approach:

 

val typeInfo = TypeInformation.of(classOf[CovidEvent]).asInstanceOf[PojoTypeInfo[CovidEvent]]

val csvInputFormat = new PojoCsvInputFormat(new Path(sourcePath), "\n", ",", typeInfo)

env.readFile(csvInputFormat
, sourcePath)

 

 

But then I get the following error because I want to parse a string to Date directly:

 

Exception in thread "main" java.lang.IllegalArgumentException: The type 'java.util.Date' is not supported for the CSV input format.

 

 

Or do I have to read it as String, then map it with some Jackson Parser and then call assignTimestampsAndWatermarks function on that?

 

 

Thanks four your help.

 

 

Best,

Jan

 

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.
Reply | Threaded
Open this post in threaded view
|

Re: DataStream API: Best way for reading csv file

Dawid Wysakowicz-2

Hi Jan,

First of all I'd rather recommend Table API for processing structured data.

However if you are convinced you want to use the DataStream API, the CsvInputFormat supports the java.sql.Date type. You can try that or what I would suggest is to parse the Date field as string and then parse it e.g. to one of the java.time classes. You could do that e.g. with the RowCsvInputFormat or TupleCsvInputFormat.

Hope that helps.

Best,

Dawid

On 22/01/2021 18:39, Jan Oelschlegel wrote:

 

Hi ,

 

i’m looking for an comfortable way to read a CSV file with the DataStream API in Flink 1.11.

 

Without using the Table/SQL-API before.

 

This is my first approach:

 

val typeInfo = TypeInformation.of(classOf[CovidEvent]).asInstanceOf[PojoTypeInfo[CovidEvent]]

val csvInputFormat = new PojoCsvInputFormat(new Path(sourcePath), "\n", ",", typeInfo)

env.readFile(csvInputFormat
, sourcePath)

 

 

But then I get the following error because I want to parse a string to Date directly:

 

Exception in thread "main" java.lang.IllegalArgumentException: The type 'java.util.Date' is not supported for the CSV input format.

 

 

Or do I have to read it as String, then map it with some Jackson Parser and then call assignTimestampsAndWatermarks function on that?

 

 

Thanks four your help.

 

 

Best,

Jan

 

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.

signature.asc (849 bytes) Download Attachment