Re: Ignoring invalid values in KafkaSerializationSchema

Posted by Matthias on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Ignoring-invalid-values-in-KafkaSerializationSchema-tp38299p38329.html

Hi Yuval,
thanks for bringing this issue up. You're right: There is no error handling currently implemented for SerializationSchema. FLIP-124 [1] addressed this for the DeserializationSchema, though. I created FLINK-19397 [2] to cover this feature.

In the meantime, I cannot think of any other solution than filtering those rows out in a step before emitting the data to Kafka.

Best,
Matthias

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
[2] https://issues.apache.org/jira/browse/FLINK-19397

On Wed, Sep 23, 2020 at 1:12 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,

I'm using a custom KafkaSerializationSchema to write records to Kafka using FlinkKafkaProducer. The objects written are Rows coming from Flink's SQL API.

In some cases, when trying to convert the Row object to a byte[], serialization will fail due to malformed values. In such cases, I would like the custom serialization schema to drop the bad records and not send them through.

From the API, it is unclear how such failures should be handled. Given the following signature:

 ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);

From reading the code, there's no exception handling or null checking, which means that:

- If an exception is thrown, it will cause the entire job to fail (this has happened to me in production)
- If null is passed, a null value will be pushed down to kafkaProducer.send which is undesirable.

What are the options here?



--
Best Regards,
Yuval Itzchakov.