Flink 1.9 Sql Rowtime Error

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.9 Sql Rowtime Error

Polarisary
Hi All:
I have define kafka connector Descriptor, and registe Table

tEnv.connect(new Kafka()
.version(
"universal")
.topic(tableName)
.startFromEarliest()
.property(
"zookeeper.connect", xxx")
.property(
"bootstrap.servers", xxx")
.property(
"group.id", xxx"))
.withFormat(
new Json().deriveSchema())
.withSchema(
new Schema()
                .field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("createTime")
.watermarksPeriodicBounded(300_000))
.field("data", Types.ROW(dataFieldTypes)))
        .inAppendMode().registerTableSource(tableName);


kafka input is:  
{
   "data": [
      18140781,
   ],
   "createTime": 1572577137596
}
Exception as follows:
Caused by: java.io.IOException: Failed to deserialize JSON object.
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: java.time.format.DateTimeParseException: Text '1553080631582' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
... 7 more

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.9 Sql Rowtime Error

OpenInx
Hi Polarisary.

Checked the flink codebase and your stacktraces, seems you need to format the timestamp as :  "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"


On Fri, Nov 1, 2019 at 3:50 PM Polarisary <[hidden email]> wrote:
Hi All:
I have define kafka connector Descriptor, and registe Table

tEnv.connect(new Kafka()
.version(
"universal")
.topic(tableName)
.startFromEarliest()
.property(
"zookeeper.connect", xxx")
.property(
"bootstrap.servers", xxx")
.property(
"group.id", xxx"))
.withFormat(
new Json().deriveSchema())
.withSchema(
new Schema()
                .field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("createTime")
.watermarksPeriodicBounded(300_000))
.field("data", Types.ROW(dataFieldTypes)))
        .inAppendMode().registerTableSource(tableName);


kafka input is:  
{
   "data": [
      18140781,
   ],
   "createTime": 1572577137596
}
Exception as follows:
Caused by: java.io.IOException: Failed to deserialize JSON object.
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: java.time.format.DateTimeParseException: Text '1553080631582' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
... 7 more