I am reading a bunch of records from a CSV file. A record looks like this:
"4/1/2014 0:11:00",40.769,-73.9549,"B02512" I intend to treat these records as SQL Rows and then process. Here's the code: ---------------------------------------- package org.nirmalya.exercise import java.time.LocalDateTime import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.table.TableEnvironment import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.table._ import org.apache.flink.api.table.sources.CsvTableSource import org.apache.flink.api.scala.table.TableConversions import org.apache.flink.api.scala._ /** * Created by nirmalya on 4/2/17. */ object TrafficDataTrainer { def main(args: Array[String]): Unit = { case class Trip(timeOfPickUp: LocalDateTime, lat: Double, lon: Double, base: String) val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val myDataStorePath = "/home/nirmalya/Downloads/Traffic" val csvTableSource = new CsvTableSource( myDataStorePath + "/traffic-raw-data-apr14.csv", Array("timeOfPickUp", "lat", "lon", "base"), ( Array[org.apache.flink.api.common.typeinfo.TypeInformation[_]]( Types.TIMESTAMP, Types.DOUBLE, Types.DOUBLE, Types.STRING ) ) ) tableEnv.registerTableSource("TrafficData",csvTableSource) val trafficTable = tableEnv.scan("TrafficData") val result = trafficTable.select("timeOfPickUp,lat,lon,base") val trafficDataSet = new TableConversions(result).toDataSet[Trip] trafficDataSet.collect().take(10).foreach(println) } } ---------------------------------------- At run time, the exception that is thrown is: ------------------------------------------------------ Exception in thread "main" java.lang.IllegalArgumentException: The type 'java.sql.Date' is not supported for the CSV input format. at org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:306) at org.apache.flink.api.table.runtime.io.RowCsvInputFormat.<init>(RowCsvInputFormat.scala:52) at org.apache.flink.api.table.sources.CsvTableSource.createCsvInput(CsvTableSource.scala:99) at org.apache.flink.api.table.sources.CsvTableSource.getDataSet(CsvTableSource.scala:78) at org.apache.flink.api.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:55) at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274) at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) at org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41) at org.nirmalya.exercise.UberDataTrainer$.main(UberDataTrainer.scala:45) at org.nirmalya.exercise.UberDataTrainer.main(UberDataTrainer.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) ------------------------------------------------------ I see that in org.apache.flink.api.common.io.GenericCsvInputFormat:303, the check fails because the stated type isn't a part of known types. However, the constructor of CsvTableSource accepts a Types.DATE as well Types.TIMESTAMP (I tried with both of them, and the exception is the same). Could someone please point out where I am going wrong? -- Nirmalya |
Hi,
java.sql.Timestamps have to have a format like " yyyy-mm-dd hh:mm:ss.[fff...]". In your case you need to parse this as a String and write your own scalar function for parsing. Regards, Timo Am 04/02/17 um 17:46 schrieb nsengupta: > "4/1/2014 0:11:00",40.769,-73.9549,"B02512" |
Hello Timo,
Thanks for the clarification. This means that I cannot use CsvTableSource, as I have, in the example. Instead, I should: * Write custom Scalar function to convert STRINGs to other datatypes as required * Read the file as CsvInput, with all fields as STRINGs * Apply the Scalar function as approrpiate and Map() to a desired a DataSet type * Convert the DataSet to a Table * Use SQL to access the Table Is my understanding correct? -- Nirmalya |
Hi, you can also use the CsvTableSource and read the DateTime fields as String.[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html#user-defined-scalar-functions 2017-02-07 3:16 GMT+01:00 nsengupta <[hidden email]>: Hello Timo, |
Free forum by Nabble | Edit this page |