Hi all, To deal with corrupted messages that can leak into the data source once in a while, we implement a custom DefaultKryoSerializer class as below that catches exceptions. The custom serializer returns null in read(...) method when it encounters exception in reading. With this implementation, the serializer may silently drop records. One concern is that it may drop too many records before we notice and take actions. What is the best practice to handle this? The serializer processes one record at a time. Will reading a corrupted record make the serialize fail to process the next valid record? public class CustomTBaseSerializer extends TBaseSerializer { private static final Logger LOG = LoggerFactory.getLogger(CustomTBaseSerializer.class); @Override public void write(Kryo kryo, Output output, TBase tBase) { try { super.write(kryo, output, tBase); } catch (Throwable t) { LOG.error("Failed to write due to unexpected Throwable", t); } } @Override public TBase read(Kryo kryo, Input input, Class<TBase> tBaseClass) { try { return super.read(kryo, input, tBaseClass); } catch (Throwable t) { LOG.error("Failed to read from input due to unexpected Throwable", t); return null; } } } Thank you! Regards, -Yu |
Hi Yu, I think when the serializer returns null, the following operator should still receive a record of null. A possible thought is that the following operator may couting the number of null records received and use a metric to publish the value to a monitor system, and the monitor system promethus, and the monitor system should be able to configure alert conditions. If null has problems, a special indicating object instance may be created like NULL_TBASE, and the operator should be able to count the number of NULL_TBASE received. Best, Yun
|
Thanks for the suggestion, Yun! On Sun, May 31, 2020 at 11:15 PM Yun Gao <[hidden email]> wrote:
|
A common approach is to use a dead letter queue, which is an extra output for bad input. So the result of the read operation would look like Tuple2<TBase, byte[]> (or use Either in scala) and return the parsed TBase on success or else put in the invalid record byte[]. Then in your DAG, split the handling of the input: DataStream<Tuple2<TBase, byte[]>> input = ...; DataStream<TBase> goodInput = input.filter(recordOrRaw -> recordOrRaw.v1 != null).map(recordOrRaw -> recordOrRaw.v1); // continue normal processing with goodInput DataStream<byte[]> badInput = input.filter(recordOrRaw -> recordOrRaw.v2 != null).map(recordOrRaw -> recordOrRaw.v2); badInput.write... // for example to Kafka Then you simply need to monitor the output of badInput. You can also easily check why they have not been able to be parsed and actually try to develop some recovery logic if possible. On Mon, Jun 1, 2020 at 9:33 AM Yu Yang <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |