How to use ExceptionUtils.findThrowable() in Scala

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use ExceptionUtils.findThrowable() in Scala

Averell
Hello,

I am trying to implement error handling in ElasticSearch sink (following the
seem-outdated Flink document [1])

<code>
        override def onFailure(actionRequest: ActionRequest, failure: Throwable,
restStatusCode: Int, indexer: RequestIndexer): Unit = {
                if (ExceptionUtils.findThrowable(failure,
classOf[org.elasticsearch.index.engine.VersionConflictEngineException]) !=
Optional.empty()) {
                        LOG.warn("Failed inserting record to ElasticSearch: statusCode {}
message: {} record: {} stacktrace {}.\nRetrying", restStatusCode.toString,
failure.getMessage, actionRequest.toString, failure.getStackTrace)
                        // Do something here
                }
                else {
                        LOG.error(s"ELASTICSEARCH FAILED:\n    statusCode $restStatusCode\n  
message: ${failure.getMessage}\n${failure.getStackTrace}")
                }
        }
</code>

I tried to have different handling for the case of
VersionConflictEngineException, but failed. It always came to the "else"
branch, thus my log message is:
/ELASTICSEARCH FAILED:
    statusCode 409
    message: Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]
/
Thanks and best regards,
Averell

[1]  handling-failing-elasticsearch-requests
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests>  




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to use ExceptionUtils.findThrowable() in Scala

Averell
P/S: This is the full stack trace

2019-02-07 01:53:12.790 [I/O dispatcher 16] ERROR
o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
Elasticsearch item request: [...][[...][1]]
ElasticsearchException[Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]]
org.elasticsearch.ElasticsearchException: Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]
        at
org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
        at
org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
        at
org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
        at
org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
        at
org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
        at
org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
        at
org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
        at
org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
        at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
        at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
        at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
        at
org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
        at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
        at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
        at
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
        at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
        at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
        at
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
        at
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
        at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
        at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
        at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
        at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
        at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
        at java.lang.Thread.run(Thread.java:748)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/