Hi,
I am trying to parse this JSON message: {"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"} using pyFlink 1.11 DDL with this code: ddl_source = f""" I used [1] for the DDL format and [2] for the timestamp string format. However, when I run this I get the following error : Caused by: java.io.IOException: Failed to deserialize JSON '{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"}'. at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126) at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Caused by: java.lang.NoSuchFieldError: SQL_TIMESTAMP_FORMAT I believe I am using the correct TIMESTAMP format in the JSON message according to the documentation so can't figure out what could be the error. Any help would be appreciated! Thanks, Manas |
Can you try changing the precision to 6 or try changing the format in json to produce only 3 fractional digits? As described in the JSON docs[1] the expected default format for timestamp is: yyyy-MM-dd HH:mm:ss.s{precision} Best, Dawid On 14/07/2020 12:07, Manas Kale wrote:
signature.asc (849 bytes) Download Attachment |
In reply to this post by Manas Kale
Forget my previous message. This is most probably some class conflict. The SQL_TIMESTAMP_FORMAT field was added in 1.11. It looks as if you were using old version of the TimeFormats class from an earlier version of Flink. Best, Dawid On 14/07/2020 12:07, Manas Kale wrote:
signature.asc (849 bytes) Download Attachment |
In reply to this post by Manas Kale
Hi,Kale
I think you’re using correct TIMESTAMP Data type in JSON format, and this should work properly. But looks like you used an old version `flink-json` dependency from the log. Could you check the version of `flink-json` is 1.11.0 ? Best, Leonard Xu
|
Thanks for the quick replies Dawid and Leonard... I had both flink-json JARs for 1.10 and 1.11. I deleted 1.10 and now it works! On Tue, Jul 14, 2020 at 4:17 PM Leonard Xu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |