Hi All: I have define kafka connector Descriptor, and registe TabletEnv.connect(new Kafka() .field("rowtime", Types.SQL_TIMESTAMP) .inAppendMode().registerTableSource(tableName); {
"data": [ 18140781, ], "createTime": 1572577137596 } Exception as follows: Caused by: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203) Caused by: java.time.format.DateTimeParseException: Text '1553080631582' could not be parsed at index 0 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127) ... 7 more |
Hi Polarisary. Checked the flink codebase and your stacktraces, seems you need to format the timestamp as : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" On Fri, Nov 1, 2019 at 3:50 PM Polarisary <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |