Hi community,when I write the flink ddl sql like this:
CREATE TABLE kafka_src (
id varchar,
a varchar,
b TIMESTAMP,
c TIMESTAMP
)
with (
...
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append'
);
If the message is not the json format ,there is a error in the log。
My question is that how to deal with the message which it not json format?
My thought is that I can catch the exception in JsonRowDeserializationSchema deserialize() method,is there any parameters to do this?
Thanks your replay.