Handle deserialization error

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Handle deserialization error

Jack Huang-2
Hi all,

I have a custom deserializer which I pass to a Kafka source to transform JSON string to Scala case class.
val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event], new Event), kafkaProp))

There are time when the JSON message is malformed, in which case I want to catch the exception, log some error message, and go on to the next message without producing an event to the downstream. It doesn't seem like the DeserializationSchema interface allows such behavior. How could I achieve this?

Thanks,
Jack
Reply | Threaded
Open this post in threaded view
|

Re: Handle deserialization error

Yassin Marzouki

Hi Jack,

As Robert Metzger mentioned in a previous thread, there's an ongoing discussion about the issue in this JIRA: https://issues.apache.org/jira/browse/FLINK-3679.

A possible workaround is to use a SimpleStringSchema in the Kafka source, and chain it with a flatMap operator where you can use your custom deserializer and handle deserialization errors.

Best,
Yassine


On Aug 27, 2016 02:37, "Jack Huang" <[hidden email]> wrote:
Hi all,

I have a custom deserializer which I pass to a Kafka source to transform JSON string to Scala case class.
val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event], new Event), kafkaProp))

There are time when the JSON message is malformed, in which case I want to catch the exception, log some error message, and go on to the next message without producing an event to the downstream. It doesn't seem like the DeserializationSchema interface allows such behavior. How could I achieve this?

Thanks,
Jack
Reply | Threaded
Open this post in threaded view
|

Re: Handle deserialization error

Jack Huang-2
Hi Yassine,

For now my workaround is catching exceptions in my custom deserializer and producing some default object to the downstream. It would still be very nice to avoid this inefficiency by not producing an object at all. 

Thanks,
Jack


On Fri, Aug 26, 2016 at 6:51 PM, Yassine Marzougui <[hidden email]> wrote:

Hi Jack,

As Robert Metzger mentioned in a previous thread, there's an ongoing discussion about the issue in this JIRA: https://issues.apache.org/jira/browse/FLINK-3679.

A possible workaround is to use a SimpleStringSchema in the Kafka source, and chain it with a flatMap operator where you can use your custom deserializer and handle deserialization errors.

Best,
Yassine


On Aug 27, 2016 02:37, "Jack Huang" <[hidden email]> wrote:
Hi all,

I have a custom deserializer which I pass to a Kafka source to transform JSON string to Scala case class.
val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event], new Event), kafkaProp))

There are time when the JSON message is malformed, in which case I want to catch the exception, log some error message, and go on to the next message without producing an event to the downstream. It doesn't seem like the DeserializationSchema interface allows such behavior. How could I achieve this?

Thanks,
Jack