Hello everyone, I've always used the DataStream API and now I'm trying out the Table API to create a datastream from a CSV and I'm finding a couple of issues: 1) I'm reading a csv with 7 total fields, the 7th of which is a date serialized as a Spark TimestampType, written on the csv like this: 2019-07-19T15:31:38.000+01:00. I've defined the TableSource like this: val csvTableSource = CsvTableSource.builder() .path("sourcefile.csv") .fieldDelimiter(",") /* fields of Types.STRING */ .field("time", Types.SQL_TIMESTAMP) .build() I'm transforming the Table to a DataStream of type Event: class Event { val ds: DataStream[Event] = tEnv.toAppendStream[Event](table) But when I'm reading from the CSV the following parsing error occurs: Caused by: org.apache.flink.api.common.io.ParseException: Parsing error for column 7 of row '......,2019-07-20T09:52:07.000+01:00' originated by SqlTimestampParser: NUMERIC_VALUE_FORMAT_ERROR. So, I'm wondering: is it possible to set a DateFormat or something to make sure the parsing succeeds? I've tried also Types.SQL_DATE and Types.SQL_TIME, but they fail with same exception. 2) My first option was to make Event as a case class, but with the same table definition, I was having trouble with the conversion, with an error telling that the "Arity of 7 fields was not compatible with the destination arity of 1, of type GenericType<Event>". What's the correct way to handle case classes? I changed to using a class (which I believe uses the POJO serializer) and it works ok, but I'm still wondering how to make it work with Case Classes, which come quite useful sometimes. Thank you very much, Federico -- Federico D'Ambrosio |
Hi Federico, 1) As far as I know, you can't set a format for timestamp parsing currently (see `SqlTimestampParser`, it just feeds your string to `SqlTimestamp.valueOf`, so your timestamp format must be compatible with SqlTimestamp). 2) How do you define your case class? You have to define its parameter list and nothing in its body to make it work. For example: case class Event(a: String, b: String, time: Timestamp) Federico D'Ambrosio <[hidden email]> 于2019年7月24日周三 下午4:10写道:
|
Hi Caizhi, thank you for your response. 1) I see, I'll use a compatible string format 2) I'm defining the case class like this: case class cEvent(state: String, id: String, device: String, I'm assuming I'm doing something wrong with the TypeInformation, since the table records are not being converted correctly. The precise error is the following: Arity [7] of result [ArrayBuffer(String, String, String, String, String, String, Timestamp)] does not match the number[1] of requested type [GenericType<cEvent>]. I noticed there's a CaseClassTypeInfo which can be created from Types.CASECLASS[cEvent], but I'm not sure how to use it after defining the table. Thank you, Federico Il giorno mer 24 lug 2019 alle ore 10:42 Caizhi Weng <[hidden email]> ha scritto:
-- Federico D'Ambrosio |
Hi Federico, I can't reproduce the error in my local environment. Would you mind sharing us your code and the full exception stack trace? This will help us diagnose the problem. Thanks. Federico D'Ambrosio <[hidden email]> 于2019年7月24日周三 下午5:45写道:
|
Hi Caizhi, thank you for your response, the full exception is the following: Exception in thread "main" org.apache.flink.table.api.TableException: Arity [7] of result [ArrayBuffer(String, String, String, String, String, String, Timestamp)] does not match the number[1] of requested type [GenericType<xautomata.cEvent>]. at org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:1165) at org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:423) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:936) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:173) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:149) at xautomata.StatePatterns$.main(StatePatterns.scala:119) at xautomata.StatePatterns.main(StatePatterns.scala) The code is the following: import org.apache.flink.streaming.api.TimeCharacteristic While writing this email, exporting the code, I noticed that what's actually causing the issue is an import, so that in the code I pasted here, it works fine, but if I additionally import: import org.apache.flink.api.scala._ Could you please verify that it happens the same to you? Is the previous import specifically for the Flink Batch API? I fear that the Intellij was showing me that import as unused and I thought nothing of it, while leaving, but I'm assuming it's causing some sort of collision with import org.apache.flink.streaming.api.scala._ Thank you, Federico Il giorno mer 24 lug 2019 alle ore 12:38 Caizhi Weng <[hidden email]> ha scritto:
-- Federico D'Ambrosio
|
Free forum by Nabble | Edit this page |