Using Date or other types in a POJO?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Using Date or other types in a POJO?

Stefan Winterstein
Hi,

I'm new to Flink and just taking the first steps...

I want to parse a CSV file that contains a date and time as the first
field, then some values:

> 07.02.2015 49.9871 234.677 ...

So I’d like to use this POJO:

> import java.util.Date;
>
> public class DataPoint
> {
>     private String dateStr; // String value of date
>     private Date date;      // the actual date
> ...
>
>     private static SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy");
>
>     public DataPoint() {}
>
>     // String setter, converts to Date
>     public void setDateStr(String value) {
>         this.dateStr = value;
>         try {
>             this.date = dateFormat.parse(dateStr); // parse string and store date
>         } catch (ParseException e) {
>             e.printStackTrace();
>         }
>     }
>
>     public String getDateStr() {
>         return this.dateStr;
>     }
>

>     public Date getDate() {
>         return this.date;
>     }
>     …
> }
       
...and pass it to the CSVReader:

> DataSet<DataPoint> csvInput = env.readCsvFile(filename)
>                                 .pojoType(DataPoint.class, "dateStr", ...);

However, this fails with an exception:

> Exception in thread "main" java.lang.IllegalArgumentException: The type 'java.util.Date' is not supported for the CSV input format.
> at org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:236)
> at org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:115)
> at org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:77)
> at org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:61)
> at org.apache.flink.api.java.io.CsvReader.pojoType(CsvReader.java:295)
> at de.dfki.iui.MyJob.main(MyJob.java:60)

I managed to work around this by storing the long value of
Date.getTime() instead of Date, but:

Does the POJO semantic really need to be that strict? Wouldn't it be
sufficient if there was an appropriate getter/setter for the member
names given to pojoType()?


Best regards,

-Stefan
Reply | Threaded
Open this post in threaded view
|

Re: Using Date or other types in a POJO?

Maximilian Michels
Hi Stefan,

The problem is that the CsvParser does not know how to parse types other than the ones that are supported. It would be nice if it supported a custom parser which is either manually specified or included in the PoJo class itself.

You can either change your PoJo fields to be of a supported types (like you already did), or read your data into a Tuple<String, Double, Double,..> first and convert the Tuples in a Map operation to a Pojo. In the map operation you can specify your own parsing logic.

Best,
Max

On Thu, Jul 30, 2015 at 11:40 AM, Stefan Winterstein <[hidden email]> wrote:
Hi,

I'm new to Flink and just taking the first steps...

I want to parse a CSV file that contains a date and time as the first
field, then some values:

> 07.02.2015    49.9871 234.677 ...

So I’d like to use this POJO:

> import java.util.Date;
>
> public class DataPoint
> {
>     private String dateStr; // String value of date
>     private Date date;      // the actual date
>       ...
>
>     private static SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy");
>
>     public DataPoint() {}
>
>     // String setter, converts to Date
>     public void setDateStr(String value) {
>         this.dateStr = value;
>         try {
>             this.date = dateFormat.parse(dateStr); // parse string and store date
>         } catch (ParseException e) {
>             e.printStackTrace();
>         }
>     }
>
>     public String getDateStr() {
>         return this.dateStr;
>     }
>

>     public Date getDate() {
>         return this.date;
>     }
>     …
> }

...and pass it to the CSVReader:

> DataSet<DataPoint> csvInput = env.readCsvFile(filename)
>                                 .pojoType(DataPoint.class, "dateStr", ...);

However, this fails with an exception:

> Exception in thread "main" java.lang.IllegalArgumentException: The type 'java.util.Date' is not supported for the CSV input format.
>       at org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:236)
>       at org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:115)
>       at org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:77)
>       at org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:61)
>       at org.apache.flink.api.java.io.CsvReader.pojoType(CsvReader.java:295)
>       at de.dfki.iui.MyJob.main(MyJob.java:60)

I managed to work around this by storing the long value of
Date.getTime() instead of Date, but:

Does the POJO semantic really need to be that strict? Wouldn't it be
sufficient if there was an appropriate getter/setter for the member
names given to pojoType()?


Best regards,

-Stefan