Handling Kafka DeserializationSchema() exceptions

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Handling Kafka DeserializationSchema() exceptions

Yassin Marzouki
Hi all,

Is there a way to handle hafka deserialization exceptions, when a JSON message is malformed for example?

I thought about extending the DeserializationSchema to emit a null or any other value, but that may cause an NPE when using a subsequent TimestampExtractor.
The other solution would be to do the serialization in a subsequent flatMap operator but it would be more convient to do it directly in the consumer to make the timestamps extraction (with the new kafka consumer timestamp extractor) easier.

Any suggestions?

Best,
Yassine
Reply | Threaded
Open this post in threaded view
|

Re: Handling Kafka DeserializationSchema() exceptions

rmetzger0
Hi Yassine,

there's an ongoing discussion about the issue in this JIRA: https://issues.apache.org/jira/browse/FLINK-3679.
Emitting null is not an option.
There are workarounds to the issue, but I think they are all not nice.

On Thu, Aug 25, 2016 at 8:05 PM, Yassine Marzougui <[hidden email]> wrote:
Hi all,

Is there a way to handle hafka deserialization exceptions, when a JSON message is malformed for example?

I thought about extending the DeserializationSchema to emit a null or any other value, but that may cause an NPE when using a subsequent TimestampExtractor.
The other solution would be to do the serialization in a subsequent flatMap operator but it would be more convient to do it directly in the consumer to make the timestamps extraction (with the new kafka consumer timestamp extractor) easier.

Any suggestions?

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Handling Kafka DeserializationSchema() exceptions

Yassin Marzouki
Good to know that there is already a JIRA issue, Thanks!

On Thu, Aug 25, 2016 at 8:58 PM, Robert Metzger <[hidden email]> wrote:
Hi Yassine,

there's an ongoing discussion about the issue in this JIRA: https://issues.apache.org/jira/browse/FLINK-3679.
Emitting null is not an option.
There are workarounds to the issue, but I think they are all not nice.

On Thu, Aug 25, 2016 at 8:05 PM, Yassine Marzougui <[hidden email]> wrote:
Hi all,

Is there a way to handle hafka deserialization exceptions, when a JSON message is malformed for example?

I thought about extending the DeserializationSchema to emit a null or any other value, but that may cause an NPE when using a subsequent TimestampExtractor.
The other solution would be to do the serialization in a subsequent flatMap operator but it would be more convient to do it directly in the consumer to make the timestamps extraction (with the new kafka consumer timestamp extractor) easier.

Any suggestions?

Best,
Yassine