containThrowable missing in ExceptionUtils

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

containThrowable missing in ExceptionUtils

Nicholas Walton
Hi,

I’m trying to implement a failure handler for ElasticSearch from the example in the Flink documentation

DataStream<String> input = ...;

input.addSink(new ElasticsearchSink<>(
    config, transportAddresses,
    new ElasticsearchSinkFunction<String>() {...},
    new ActionRequestFailureHandler() {
        @Override
        void onFailure(ActionRequest action,
                Throwable failure,
                int restStatusCode,
                RequestIndexer indexer) throw Throwable {

            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
                // full queue; re-add document for indexing
                indexer.add(action);
            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
                // malformed document; simply drop request without failing sink
            } else {
                // for all other failures, fail the sink
                // here the failure is simply rethrown, but users can also choose to throw custom exceptions
                throw failure;
            }
        }
}));

However, I can only find ExceptionUtils.containsThrowable in Flink 1.3. It is not present in 1.8. Am I mistaken, or if I’m not how I can I implement it using findThrowable ?

TIA

Nick
Reply | Threaded
Open this post in threaded view
|

Re: containThrowable missing in ExceptionUtils

Chesnay Schepler
The listed method no longer exists and was subsumed by ExceptionUtils#findThrowable, which also gives access to the Throwable if it could be found.

I have filed FLINK-14334 for updating the documentation.

On 02/10/2019 15:48, Nicholas Walton wrote:
Hi,

I’m trying to implement a failure handler for ElasticSearch from the example in the Flink documentation

DataStream<String> input = ...;

input.addSink(new ElasticsearchSink<>(
    config, transportAddresses,
    new ElasticsearchSinkFunction<String>() {...},
    new ActionRequestFailureHandler() {
        @Override
        void onFailure(ActionRequest action,
                Throwable failure,
                int restStatusCode,
                RequestIndexer indexer) throw Throwable {

            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
                // full queue; re-add document for indexing
                indexer.add(action);
            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
                // malformed document; simply drop request without failing sink
            } else {
                // for all other failures, fail the sink
                // here the failure is simply rethrown, but users can also choose to throw custom exceptions
                throw failure;
            }
        }
}));

However, I can only find ExceptionUtils.containsThrowable in Flink 1.3. It is not present in 1.8. Am I mistaken, or if I’m not how I can I implement it using findThrowable ?
TIA
Nick