Hi Users,
Is there a way I can do a schema validation on read from Kafka in a Flink job. I have a pipeline like this Kafka Topic Raw(json data) -> Kafka Topic Avro(avro data) -> Kafka Topic Transformed(avro data) -> Sink While reading from Raw topic I wanted to validate the schema so that in case the schema check fails I can push the event to an error topic. I understand from documentation[1] that the events which cannot be deserialised will be returned as null and consumer moves ahead(failing the consumer does not help as this could be re-tried with same result). Is there a way I can filter these records if the events cannot be deserialised . 'When encountering a corrupted message that cannot be deserialised for any reason, there are two options - either throwing an exception from the deserialize(...) method which will cause the job to fail and be restarted, or returning null to allow the Flink Kafka consumer to silently skip the corrupted message.'Thanks, Hemant |
Hi Hemant, you could let the Kafka consumer just deserialize your JSON data as into a DataStream<String>, then you use a custom processFunction to parse the string to JSON. In your custom function, you can handle the error more flexibly (like outputting erroneous records through a side output). I hope this helps! Best, Robert On Wed, Mar 18, 2020 at 11:48 AM hemant singh <[hidden email]> wrote:
|
Hi Robert, Thanks for your reply. This helps, was looking into similar direction. Thanks, Hemant On Wed, 18 Mar 2020 at 8:44 PM, Robert Metzger <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |