Hi community,when I write the flink ddl sql like this:
CREATE TABLE kafka_src ( 
  id varchar,
  a varchar,
  b TIMESTAMP,
  c TIMESTAMP
)
  with ( 
   ...
    'format.type' = 'json',  
    'format.property-version' = '1',  
    'format.derive-schema' = 'true',  
    'update-mode' = 'append' 
);
If the message is not the json format ,there is a error in the log。
My question is that how to deal with the message which it not json format?
My thought is that I can catch the exception in JsonRowDeserializationSchema deserialize() method,is there any parameters to do this?
Thanks your replay.