If I am processing a stream in the following manner:
val stream = env.addSource(consumer).name("KafkaStream")
.keyBy(x => (x.obj.ID1(),x.obj.ID2(),x.obj.ID3())
.flatMap(new FlatMapProcessor)
and the IDs bomb out because of deserialization issues, my job crashes with a 'Could not extract key' error. How can I trap this cleanly? The only thing I can think of is to validate the IDs in the deserialization class argument that is used in the KafkaConsumer constructor, and trap any issues there. Is that the preferred way? Is there a better way?