DDL TIMESTAMP(3) parsing issue

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

DDL TIMESTAMP(3) parsing issue

Manas Kale
Hi,
I am trying to parse this JSON message:
{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"}
using pyFlink 1.11 DDL with this code:
ddl_source = f"""               
CREATE TABLE {INPUT_TABLE} (
`monitorId` STRING,
`deviceId` STRING,
`state` INT,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
`data` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
I used [1] for the DDL format and [2] for the timestamp string format. However, when I run this I get the following error : 
Caused by: java.io.IOException: Failed to deserialize JSON '{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"}'.
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.NoSuchFieldError: SQL_TIMESTAMP_FORMAT

I believe I am using the correct TIMESTAMP format in the JSON message according to the documentation so can't figure out what could be the error.

Any help would be appreciated!


Thanks,
Manas

Reply | Threaded
Open this post in threaded view
|

Re: DDL TIMESTAMP(3) parsing issue

Dawid Wysakowicz-2

Can you try changing the precision to 6 or try changing the format in json to produce only 3 fractional digits? As described in the JSON docs[1] the expected default format for timestamp is: yyyy-MM-dd HH:mm:ss.s{precision}

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard

On 14/07/2020 12:07, Manas Kale wrote:
Hi,
I am trying to parse this JSON message:
{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"}
using pyFlink 1.11 DDL with this code:
ddl_source = f"""               
    CREATE TABLE {INPUT_TABLE} (
        `monitorId` STRING,
        `deviceId` STRING,
        `state` INT,
        `time_st` TIMESTAMP(3),
        WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
        `data` DOUBLE
    ) WITH (
        'connector' = 'kafka',
        'topic' = '{INPUT_TOPIC}',
        'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
        'format' = 'json'
    )
"""
I used [1] for the DDL format and [2] for the timestamp string format. However, when I run this I get the following error : 
Caused by: java.io.IOException: Failed to deserialize JSON '{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"}'.
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.NoSuchFieldError: SQL_TIMESTAMP_FORMAT

I believe I am using the correct TIMESTAMP format in the JSON message according to the documentation so can't figure out what could be the error.

Any help would be appreciated!


Thanks,
Manas


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: DDL TIMESTAMP(3) parsing issue

Dawid Wysakowicz-2
In reply to this post by Manas Kale

Forget my previous message. This is most probably some class conflict. The SQL_TIMESTAMP_FORMAT field was added in 1.11. It looks as if you were using old version of the TimeFormats class from an earlier version of Flink.

Best,

Dawid

On 14/07/2020 12:07, Manas Kale wrote:
Hi,
I am trying to parse this JSON message:
{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"}
using pyFlink 1.11 DDL with this code:
ddl_source = f"""               
    CREATE TABLE {INPUT_TABLE} (
        `monitorId` STRING,
        `deviceId` STRING,
        `state` INT,
        `time_st` TIMESTAMP(3),
        WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
        `data` DOUBLE
    ) WITH (
        'connector' = 'kafka',
        'topic' = '{INPUT_TOPIC}',
        'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
        'format' = 'json'
    )
"""
I used [1] for the DDL format and [2] for the timestamp string format. However, when I run this I get the following error : 
Caused by: java.io.IOException: Failed to deserialize JSON '{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"}'.
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.NoSuchFieldError: SQL_TIMESTAMP_FORMAT

I believe I am using the correct TIMESTAMP format in the JSON message according to the documentation so can't figure out what could be the error.

Any help would be appreciated!


Thanks,
Manas


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: DDL TIMESTAMP(3) parsing issue

Leonard Xu
In reply to this post by Manas Kale
Hi,Kale

I think you’re using correct TIMESTAMP Data type in JSON format, and this should work properly.
But looks like you used an old version `flink-json` dependency from the log.  Could you check the version of `flink-json` is 1.11.0 ?

Best,
Leonard Xu
 
在 2020年7月14日,18:07,Manas Kale <[hidden email]> 写道:

Hi,
I am trying to parse this JSON message:
{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"}
using pyFlink 1.11 DDL with this code:
ddl_source = f"""               
CREATE TABLE {INPUT_TABLE} (
`monitorId` STRING,
`deviceId` STRING,
`state` INT,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
`data` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
I used [1] for the DDL format and [2] for the timestamp string format. However, when I run this I get the following error : 
Caused by: java.io.IOException: Failed to deserialize JSON '{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"}'.
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.NoSuchFieldError: SQL_TIMESTAMP_FORMAT

I believe I am using the correct TIMESTAMP format in the JSON message according to the documentation so can't figure out what could be the error.

Any help would be appreciated!


Thanks,
Manas


Reply | Threaded
Open this post in threaded view
|

Re: DDL TIMESTAMP(3) parsing issue

Manas Kale
Thanks for the quick replies Dawid and Leonard... I had both flink-json JARs for 1.10 and 1.11. I deleted 1.10 and now it works!

On Tue, Jul 14, 2020 at 4:17 PM Leonard Xu <[hidden email]> wrote:
Hi,Kale

I think you’re using correct TIMESTAMP Data type in JSON format, and this should work properly.
But looks like you used an old version `flink-json` dependency from the log.  Could you check the version of `flink-json` is 1.11.0 ?

Best,
Leonard Xu
 
在 2020年7月14日,18:07,Manas Kale <[hidden email]> 写道:

Hi,
I am trying to parse this JSON message:
{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"}
using pyFlink 1.11 DDL with this code:
ddl_source = f"""               
CREATE TABLE {INPUT_TABLE} (
`monitorId` STRING,
`deviceId` STRING,
`state` INT,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
`data` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
I used [1] for the DDL format and [2] for the timestamp string format. However, when I run this I get the following error : 
Caused by: java.io.IOException: Failed to deserialize JSON '{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"}'.
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.NoSuchFieldError: SQL_TIMESTAMP_FORMAT

I believe I am using the correct TIMESTAMP format in the JSON message according to the documentation so can't figure out what could be the error.

Any help would be appreciated!


Thanks,
Manas