Yes, you need to send the errors in band.
There are two options how you implement it:
A) Using the KeyedSerializationSchema, then you need to define only one Kafka Producer, because you can specify the target topic using the KeyedSerializationSchema.getTargetTopic() method.
(operator generating errors) --> stream with errors --> sink
B) Using two sinks (then, you probably don't need to use the
KeyedSerializationSchema)
(operator generating errors) --> (filter) --> stream without errors --> sink
--> (filter) --> error stream --> sink
To distinguish between error / regular data, you could use for example a Tuple2<>.
Regards,
Robert