HI,
I need help with handling errors with the elasticsearch sink as below 2019-11-19 08:09:09,043 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase - Failed Elasticsearch item request: [flink-index-deduplicated/nHWQM0XMSTatRri7zw_s_Q][[flink-index-deduplicated][13]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[75:108]: version conflict, document already exists (current version [1])]] [flink-index-deduplicated/nHWQM0XMSTatRri7zw_s_Q][[flink-index-deduplicated][13]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[75:108]: version conflict, document already exists (current version [1])]] at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510) at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421) at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135) at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198) at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549) at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580) at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621) at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375) at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366) at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122) at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) at java.lang.Thread.run(Thread.java:748) The error is expected since I am creating documents with duplicate ids, so I can only load new data from a previous batch that was only partially loaded or due to a timeout I’ve uploaded a document twice to ensure the document is definitely loaded and not lost in the timeout. The document is created as val json = new util.HashMap[String, Any] json.put("arrayinstance", esid) json.put("bearing", element._1) json.put("sampleindex", element._2) json.put("sample", element._3) json.put("hashstring", element._4) json.put("priorrepeats", element._5) return Requests.indexRequest() .index("flink-index-deduplicated") .`type`("_doc") .id(element._1+":"+element._2) .create(true) .source(json) } My problem is how can I catch the failure, recover and carryon? I have set a failure handler as below which will need extending to handle the failure above esSinkBuilder.setFailureHandler( new ActionRequestFailureHandler() { @throws(classOf[Throwable]) @Override override def onFailure(action: ActionRequest, failure: Throwable, restStatusCode: Int, indexer: RequestIndexer) { if (ExceptionUtils.findThrowable(failure, classOf[EsRejectedExecutionException]).isPresent) { Job.LOG.info("ElasticSearch full queue; re-added document for indexing") indexer.add(action) } else if (ExceptionUtils.findThrowable(failure, classOf[ElasticsearchParseException]).isPresent) { LOG.info("Malformed ElasticSearch document. Document dropped") } else if (ExceptionUtils.findThrowable(failure, classOf[java.net.SocketTimeoutException]).isPresent) { LOG.info("ElasticSearch document timeout; re-added document for indexing") indexer.add(action) }/* else if (ExceptionUtils.findThrowable(failure, classOf[]).isPresent) { LOG.info("ElasticSearch document duplicate; ignored document") } */else { // for all other failures, fail the sink // here the failure is simply rethrown, but users can also choose to throw custom exceptions Job.LOG.info(failure.getMessage) throw failure } } } ) I have tried just ignoring the failure by removing the "throw failure” but to no avail
|
Free forum by Nabble | Edit this page |