best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

Yu Yang
Hi all, 

To deal with corrupted messages that can leak into the data source once in a while, we implement a custom DefaultKryoSerializer class as below that catches exceptions. The custom serializer returns null in read(...) method when it encounters exception in reading. With this implementation, the serializer may silently drop records.  One concern is that it may drop too many records before we notice and take actions. What is the best practice to handle this?  

The serializer processes one record at a time. Will reading a corrupted record make the serialize fail to process the next valid record?

public class CustomTBaseSerializer extends TBaseSerializer {
     private static final Logger LOG = LoggerFactory.getLogger(CustomTBaseSerializer.class);
     @Override
     public void write(Kryo kryoOutput outputTBase tBase) {
         try {
             super.write(kryooutputtBase);
        } catch (Throwable t) {
             LOG.error("Failed to write due to unexpected Throwable"t);
        }
    }

     @Override
     public TBase read(Kryo kryoInput inputClass<TBase> tBaseClass) {
         try {
             return super.read(kryoinputtBaseClass);
        } catch (Throwable t) {
             LOG.error("Failed to read from input due to unexpected Throwable"t);
             return null;
        }
     }
  }

Thank you!

Regards, 
-Yu
Reply | Threaded
Open this post in threaded view
|

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

Yun Gao
Hi Yu,

I think when the serializer returns null, the following operator should still receive a record of null. A possible thought is that the following operator may couting the number of null records received and use a metric to publish the value to a monitor system, and the monitor system promethus, and the monitor system should be able to configure alert conditions.

If null has problems, a special indicating object instance may be created like NULL_TBASE, and the operator should be able to count the number of NULL_TBASE received.

Best,
 Yun


------------------Original Mail ------------------
Sender:Yu Yang <[hidden email]>
Send Date:Mon Jun 1 06:37:35 2020
Recipients:user <[hidden email]>
Subject:best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?
Hi all, 

To deal with corrupted messages that can leak into the data source once in a while, we implement a custom DefaultKryoSerializer class as below that catches exceptions. The custom serializer returns null in read(...) method when it encounters exception in reading. With this implementation, the serializer may silently drop records.  One concern is that it may drop too many records before we notice and take actions. What is the best practice to handle this?  

The serializer processes one record at a time. Will reading a corrupted record make the serialize fail to process the next valid record?

public class CustomTBaseSerializer extends TBaseSerializer {
     private static final Logger LOG = LoggerFactory.getLogger(CustomTBaseSerializer.class);
     @Override
     public void write(Kryo kryoOutput outputTBase tBase) {
         try {
             super.write(kryooutputtBase);
        } catch (Throwable t) {
             LOG.error("Failed to write due to unexpected Throwable"t);
        }
    }

     @Override
     public TBase read(Kryo kryoInput inputClass<TBase> tBaseClass) {
         try {
             return super.read(kryoinputtBaseClass);
        } catch (Throwable t) {
             LOG.error("Failed to read from input due to unexpected Throwable"t);
             return null;
        }
     }
  }

Thank you!

Regards, 
-Yu
Reply | Threaded
Open this post in threaded view
|

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

Yu Yang
Thanks for the suggestion, Yun! 

On Sun, May 31, 2020 at 11:15 PM Yun Gao <[hidden email]> wrote:
Hi Yu,

I think when the serializer returns null, the following operator should still receive a record of null. A possible thought is that the following operator may couting the number of null records received and use a metric to publish the value to a monitor system, and the monitor system promethus, and the monitor system should be able to configure alert conditions.

If null has problems, a special indicating object instance may be created like NULL_TBASE, and the operator should be able to count the number of NULL_TBASE received.

Best,
 Yun


------------------Original Mail ------------------
Sender:Yu Yang <[hidden email]>
Send Date:Mon Jun 1 06:37:35 2020
Recipients:user <[hidden email]>
Subject:best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?
Hi all, 

To deal with corrupted messages that can leak into the data source once in a while, we implement a custom DefaultKryoSerializer class as below that catches exceptions. The custom serializer returns null in read(...) method when it encounters exception in reading. With this implementation, the serializer may silently drop records.  One concern is that it may drop too many records before we notice and take actions. What is the best practice to handle this?  

The serializer processes one record at a time. Will reading a corrupted record make the serialize fail to process the next valid record?

public class CustomTBaseSerializer extends TBaseSerializer {
     private static final Logger LOG = LoggerFactory.getLogger(CustomTBaseSerializer.class);
     @Override
     public void write(Kryo kryoOutput outputTBase tBase) {
         try {
             super.write(kryooutputtBase);
        } catch (Throwable t) {
             LOG.error("Failed to write due to unexpected Throwable"t);
        }
    }

     @Override
     public TBase read(Kryo kryoInput inputClass<TBase> tBaseClass) {
         try {
             return super.read(kryoinputtBaseClass);
        } catch (Throwable t) {
             LOG.error("Failed to read from input due to unexpected Throwable"t);
             return null;
        }
     }
  }

Thank you!

Regards, 
-Yu
Reply | Threaded
Open this post in threaded view
|

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

Arvid Heise-3
A common approach is to use a dead letter queue, which is an extra output for bad input.

So the result of the read operation would look like Tuple2<TBase, byte[]> (or use Either in scala) and return the parsed TBase on success or else put in the invalid record byte[].

Then in your DAG, split the handling of the input:

DataStream<Tuple2<TBase, byte[]>> input = ...;
DataStream<TBase> goodInput = input.filter(recordOrRaw -> recordOrRaw.v1 != null).map(recordOrRaw -> recordOrRaw.v1);
// continue normal processing with goodInput
DataStream<byte[]> badInput = input.filter(recordOrRaw -> recordOrRaw.v2 != null).map(recordOrRaw -> recordOrRaw.v2);
badInput.write... // for example to Kafka

Then you simply need to monitor the output of badInput. You can also easily check why they have not been able to be parsed and actually try to develop some recovery logic if possible.

On Mon, Jun 1, 2020 at 9:33 AM Yu Yang <[hidden email]> wrote:
Thanks for the suggestion, Yun! 

On Sun, May 31, 2020 at 11:15 PM Yun Gao <[hidden email]> wrote:
Hi Yu,

I think when the serializer returns null, the following operator should still receive a record of null. A possible thought is that the following operator may couting the number of null records received and use a metric to publish the value to a monitor system, and the monitor system promethus, and the monitor system should be able to configure alert conditions.

If null has problems, a special indicating object instance may be created like NULL_TBASE, and the operator should be able to count the number of NULL_TBASE received.

Best,
 Yun


------------------Original Mail ------------------
Sender:Yu Yang <[hidden email]>
Send Date:Mon Jun 1 06:37:35 2020
Recipients:user <[hidden email]>
Subject:best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?
Hi all, 

To deal with corrupted messages that can leak into the data source once in a while, we implement a custom DefaultKryoSerializer class as below that catches exceptions. The custom serializer returns null in read(...) method when it encounters exception in reading. With this implementation, the serializer may silently drop records.  One concern is that it may drop too many records before we notice and take actions. What is the best practice to handle this?  

The serializer processes one record at a time. Will reading a corrupted record make the serialize fail to process the next valid record?

public class CustomTBaseSerializer extends TBaseSerializer {
     private static final Logger LOG = LoggerFactory.getLogger(CustomTBaseSerializer.class);
     @Override
     public void write(Kryo kryoOutput outputTBase tBase) {
         try {
             super.write(kryooutputtBase);
        } catch (Throwable t) {
             LOG.error("Failed to write due to unexpected Throwable"t);
        }
    }

     @Override
     public TBase read(Kryo kryoInput inputClass<TBase> tBaseClass) {
         try {
             return super.read(kryoinputtBaseClass);
        } catch (Throwable t) {
             LOG.error("Failed to read from input due to unexpected Throwable"t);
             return null;
        }
     }
  }

Thank you!

Regards, 
-Yu


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng