Hello,
I am trying to implement error handling in ElasticSearch sink (following the seem-outdated Flink document [1]) <code> override def onFailure(actionRequest: ActionRequest, failure: Throwable, restStatusCode: Int, indexer: RequestIndexer): Unit = { if (ExceptionUtils.findThrowable(failure, classOf[org.elasticsearch.index.engine.VersionConflictEngineException]) != Optional.empty()) { LOG.warn("Failed inserting record to ElasticSearch: statusCode {} message: {} record: {} stacktrace {}.\nRetrying", restStatusCode.toString, failure.getMessage, actionRequest.toString, failure.getStackTrace) // Do something here } else { LOG.error(s"ELASTICSEARCH FAILED:\n statusCode $restStatusCode\n message: ${failure.getMessage}\n${failure.getStackTrace}") } } </code> I tried to have different handling for the case of VersionConflictEngineException, but failed. It always came to the "else" branch, thus my log message is: /ELASTICSEARCH FAILED: statusCode 409 message: Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][...]: version conflict, document already exists (current version [1])] / Thanks and best regards, Averell [1] handling-failing-elasticsearch-requests <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
P/S: This is the full stack trace
2019-02-07 01:53:12.790 [I/O dispatcher 16] ERROR o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase - Failed Elasticsearch item request: [...][[...][1]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][...]: version conflict, document already exists (current version [1])]] org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][...]: version conflict, document already exists (current version [1])] at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510) at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421) at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135) at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198) at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549) at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580) at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621) at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375) at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366) at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121) at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) at java.lang.Thread.run(Thread.java:748) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |