Exception handling

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception handling

Jacob Sevart
How do we get uncaught exceptions in operators to skip the problematic messages, rather than crash the entire job? Is there an easier or less mistake-prone way to do this than wrapping every operator method in try/catch? 

And what to do about Map? Since it has to return something, we're either returning null and then catching it with a .filter(Objects.nonNull) in the next operator, or converting it to FlatMap. FlatMap conversion is annoying, because then we need to mock the Collector for testing.

Obviously it would be best to sanitize inputs so that exceptions don't occur, but we've recently encountered some setbacks in the game of whack-a-mole with pathological messages, and are hoping to mitigate the losses when these do occur.

Jacob
Reply | Threaded
Open this post in threaded view
|

Re: Exception handling

Till Rohrmann
Hi Jacob,

one of the contracts Flink has is that if a UDF throws an exception then this means that it has failed and that it needs recovery. Hence, it is the responsibility of the user to make sure that tolerable exceptions do not bubble up. If you have dirty input data then it might make sense to put a sanitization operator directly after the sources which filters out invalid data so that downstream operators can assume that the data is correct.

For the question about Map you can either convert it to a FlatMap operation which can output arbitrarily many elements (also zero) or you introduce something like an Optional type which can represent a null value and a non-null value. This is something you can do in the user code.

I hope this helps a bit.

Cheers,
Till

On Tue, Apr 27, 2021 at 7:30 PM Jacob Sevart <[hidden email]> wrote:
How do we get uncaught exceptions in operators to skip the problematic messages, rather than crash the entire job? Is there an easier or less mistake-prone way to do this than wrapping every operator method in try/catch? 

And what to do about Map? Since it has to return something, we're either returning null and then catching it with a .filter(Objects.nonNull) in the next operator, or converting it to FlatMap. FlatMap conversion is annoying, because then we need to mock the Collector for testing.

Obviously it would be best to sanitize inputs so that exceptions don't occur, but we've recently encountered some setbacks in the game of whack-a-mole with pathological messages, and are hoping to mitigate the losses when these do occur.

Jacob