Table API: java.sql.DateTime is not supported;

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Table API: java.sql.DateTime is not supported;

nsengupta
I am reading a bunch of records from a CSV file. A record looks like this:

"4/1/2014 0:11:00",40.769,-73.9549,"B02512"

I intend to treat these records as SQL Rows and then process.

Here's the code:
----------------------------------------
package org.nirmalya.exercise

import java.time.LocalDateTime

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.table._
import org.apache.flink.api.table.sources.CsvTableSource
import org.apache.flink.api.scala.table.TableConversions
import org.apache.flink.api.scala._
/**
  * Created by nirmalya on 4/2/17.
  */
object TrafficDataTrainer {

  def main(args: Array[String]): Unit = {

    case class Trip(timeOfPickUp: LocalDateTime, lat: Double, lon: Double, base: String)

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val myDataStorePath = "/home/nirmalya/Downloads/Traffic"

    val csvTableSource = new CsvTableSource(
      myDataStorePath + "/traffic-raw-data-apr14.csv",
      Array("timeOfPickUp", "lat", "lon", "base"),
      (
        Array[org.apache.flink.api.common.typeinfo.TypeInformation[_]](
          Types.TIMESTAMP,
          Types.DOUBLE,
          Types.DOUBLE,
          Types.STRING
        )
      )
    )

    tableEnv.registerTableSource("TrafficData",csvTableSource)

    val trafficTable = tableEnv.scan("TrafficData")

    val result = trafficTable.select("timeOfPickUp,lat,lon,base")

    val trafficDataSet = new TableConversions(result).toDataSet[Trip]

    trafficDataSet.collect().take(10).foreach(println)
  }
}
----------------------------------------

At run time, the exception that is thrown is:

------------------------------------------------------
Exception in thread "main" java.lang.IllegalArgumentException: The type 'java.sql.Date' is not supported for the CSV input format.
        at org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:306)
        at org.apache.flink.api.table.runtime.io.RowCsvInputFormat.<init>(RowCsvInputFormat.scala:52)
        at org.apache.flink.api.table.sources.CsvTableSource.createCsvInput(CsvTableSource.scala:99)
        at org.apache.flink.api.table.sources.CsvTableSource.getDataSet(CsvTableSource.scala:78)
        at org.apache.flink.api.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:55)
        at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
        at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
        at org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41)
        at org.nirmalya.exercise.UberDataTrainer$.main(UberDataTrainer.scala:45)
        at org.nirmalya.exercise.UberDataTrainer.main(UberDataTrainer.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

------------------------------------------------------

I see that in org.apache.flink.api.common.io.GenericCsvInputFormat:303, the check fails because the stated type
isn't a part of known types. However, the constructor of CsvTableSource accepts a Types.DATE as well Types.TIMESTAMP (I tried with both of them, and the exception is the same).

Could someone please point out where I am going wrong?

-- Nirmalya



Reply | Threaded
Open this post in threaded view
|

Re: Table API: java.sql.DateTime is not supported;

Timo Walther
Hi,

java.sql.Timestamps have to have a format like " yyyy-mm-dd
hh:mm:ss.[fff...]". In your case you need to parse this as a String and
write your own scalar function for parsing.

Regards,
Timo


Am 04/02/17 um 17:46 schrieb nsengupta:
> "4/1/2014 0:11:00",40.769,-73.9549,"B02512"


Reply | Threaded
Open this post in threaded view
|

Re: Table API: java.sql.DateTime is not supported;

nsengupta
Hello Timo,

Thanks for the clarification.

This means that I cannot use CsvTableSource, as I have, in the example. Instead, I should:

 *   Write custom Scalar function to convert STRINGs to other datatypes as required
 *   Read the file as CsvInput, with all fields as STRINGs
 *   Apply the Scalar function as approrpiate and Map() to a desired a DataSet type
 *   Convert the DataSet to a Table
 *    Use SQL to access the Table

Is my understanding correct?

-- Nirmalya
Reply | Threaded
Open this post in threaded view
|

Re: Table API: java.sql.DateTime is not supported;

Fabian Hueske-2
Hi,

you can also use the CsvTableSource and read the DateTime fields as String.
This will directly give you a table. You can implement a user-defined scalar function [1] to parse the String into a DateTime type.

The benefit is that you stay in the Table API / SQL and don't have to deal with the DataSet or DataStream API and the conversion.

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html#user-defined-scalar-functions

2017-02-07 3:16 GMT+01:00 nsengupta <[hidden email]>:
Hello Timo,

Thanks for the clarification.

This means that I *cannot use CsvTableSource*, as I have, in the example.
Instead, I should:

 *   Write custom Scalar function to convert STRINGs to other datatypes as
required
 *   Read the file as CsvInput, with all fields as STRINGs
 *   Apply the Scalar function as approrpiate and Map() to a desired a
*DataSet* type
 *   /Convert/ the DataSet to a Table
 *    Use SQL to access the Table

Is my understanding correct?

-- Nirmalya



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-java-sql-DateTime-is-not-supported-tp11439p11480.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.