Flink Schema Validation - Kafka

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Schema Validation - Kafka

hemant singh
Hi Users,

Is there a way I can do a schema validation on read from Kafka in a Flink job. 

I have a pipeline like this 

Kafka Topic Raw(json data) -> Kafka Topic Avro(avro data) -> Kafka Topic Transformed(avro data) -> Sink 

While reading from Raw topic I wanted to validate the schema so that in case the schema check fails I can push the event to an error topic. I understand from documentation[1] that the events which cannot be deserialised will be returned as null and consumer moves ahead(failing the consumer does not help as this could be re-tried with same result).
Is there a way I can filter these records if the events cannot be deserialised .

'When encountering a corrupted message that cannot be deserialised for any reason, there are two options - either throwing an exception from the deserialize(...) method which will cause the job to fail and be restarted, or returning null to allow the Flink Kafka consumer to silently skip the corrupted message.'

Thanks,
Hemant
Reply | Threaded
Open this post in threaded view
|

Re: Flink Schema Validation - Kafka

rmetzger0
Hi Hemant,

you could let the Kafka consumer just deserialize your JSON data as into a DataStream<String>, then you use a custom processFunction to parse the string to JSON.
In your custom function, you can handle the error more flexibly (like outputting erroneous records through a side output).

I hope this helps!

Best,
Robert

On Wed, Mar 18, 2020 at 11:48 AM hemant singh <[hidden email]> wrote:
Hi Users,

Is there a way I can do a schema validation on read from Kafka in a Flink job. 

I have a pipeline like this 

Kafka Topic Raw(json data) -> Kafka Topic Avro(avro data) -> Kafka Topic Transformed(avro data) -> Sink 

While reading from Raw topic I wanted to validate the schema so that in case the schema check fails I can push the event to an error topic. I understand from documentation[1] that the events which cannot be deserialised will be returned as null and consumer moves ahead(failing the consumer does not help as this could be re-tried with same result).
Is there a way I can filter these records if the events cannot be deserialised .

'When encountering a corrupted message that cannot be deserialised for any reason, there are two options - either throwing an exception from the deserialize(...) method which will cause the job to fail and be restarted, or returning null to allow the Flink Kafka consumer to silently skip the corrupted message.'

Thanks,
Hemant
Reply | Threaded
Open this post in threaded view
|

Re: Flink Schema Validation - Kafka

hemant singh
Hi Robert,

Thanks for your reply. This helps, was looking into similar direction.

Thanks,
Hemant

On Wed, 18 Mar 2020 at 8:44 PM, Robert Metzger <[hidden email]> wrote:
Hi Hemant,

you could let the Kafka consumer just deserialize your JSON data as into a DataStream<String>, then you use a custom processFunction to parse the string to JSON.
In your custom function, you can handle the error more flexibly (like outputting erroneous records through a side output).

I hope this helps!

Best,
Robert

On Wed, Mar 18, 2020 at 11:48 AM hemant singh <[hidden email]> wrote:
Hi Users,

Is there a way I can do a schema validation on read from Kafka in a Flink job. 

I have a pipeline like this 

Kafka Topic Raw(json data) -> Kafka Topic Avro(avro data) -> Kafka Topic Transformed(avro data) -> Sink 

While reading from Raw topic I wanted to validate the schema so that in case the schema check fails I can push the event to an error topic. I understand from documentation[1] that the events which cannot be deserialised will be returned as null and consumer moves ahead(failing the consumer does not help as this could be re-tried with same result).
Is there a way I can filter these records if the events cannot be deserialised .

'When encountering a corrupted message that cannot be deserialised for any reason, there are two options - either throwing an exception from the deserialize(...) method which will cause the job to fail and be restarted, or returning null to allow the Flink Kafka consumer to silently skip the corrupted message.'

Thanks,
Hemant