Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

LakeShen
Hi community,when I write the flink ddl sql like this:

CREATE TABLE kafka_src (
  id varchar,
  a varchar,
  b TIMESTAMP,
  c TIMESTAMP
)
  with (
   ...
    'format.type' = 'json',  
    'format.property-version' = '1',  
    'format.derive-schema' = 'true',  
    'update-mode' = 'append'
);

If the message is not the json format ,there is a error in the log。
My question is that how to deal with the message which it not json format?
My thought is that I can catch the exception in JsonRowDeserializationSchema deserialize() method,is there any parameters to do this?
Thanks your replay.

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

Jark Wu-3
Hi LakeShen,

I'm sorry there is no such configuration for json format currently.
I think it makes sense to add such configuration like 'format.ignore-parse-errors' in csv format. 
I created FLINK-15396[1] to track this. 

Best,
Jark


On Thu, 26 Dec 2019 at 11:44, LakeShen <[hidden email]> wrote:
Hi community,when I write the flink ddl sql like this:

CREATE TABLE kafka_src (
  id varchar,
  a varchar,
  b TIMESTAMP,
  c TIMESTAMP
)
  with (
   ...
    'format.type' = 'json',  
    'format.property-version' = '1',  
    'format.derive-schema' = 'true',  
    'update-mode' = 'append'
);

If the message is not the json format ,there is a error in the log。
My question is that how to deal with the message which it not json format?
My thought is that I can catch the exception in JsonRowDeserializationSchema deserialize() method,is there any parameters to do this?
Thanks your replay.