ElasticSearch failing when parallelised

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

ElasticSearch failing when parallelised

Nicholas Walton
HI,

I’m running ElasticSearch as a sink for a batch file processing a CSV file of 6.2 million rows, with each row being 181 numeric values. It quite happily processes a small example of around 2,000 rows, running each column through a single parallel pipeline, keyed by column number.

However, once I scale up to the full data size, with parallelism set higher than one typically eight, after a while ElasticSearch fails as below

2019-10-17 09:36:09,550 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase  - Failed Elasticsearch bulk request: request retries exceeded max retry timeout [30000]
java.io.IOException: request retries exceeded max retry timeout [30000]
at org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:745)
2019-10-17 09:36:09,576 INFO  org.example.Job$                                              - Failed ElasticSearch document. Exception rethrown
2019-10-17 09:36:09,624 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:343)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:380)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: request retries exceeded max retry timeout [30000]
at org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
... 1 more
2019-10-17 09:36:09,629 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(GlobalWindows(), CountTrigger, CountEvictor, ScalaProcessWindowFunctionWrapper) -> Map -> Map -> Map -> Sink: Unnamed (2/8) (cb74c5d1c9323000fae9488d7d256468) switched from RUNNING to FAILED.
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.example.signalRecognitionWindowFunction.process(signalRecognitionWindowFunction.scala:42)
at org.example.signalRecognitionWindowFunction.process(signalRecognitionWindowFunction.scala:8)
at org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:359)
at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:218)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: request retries exceeded max retry timeout [30000]
at org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
... 1 more


I’m running Flink 1.8.1 and version 1.9.0 of the ES connector. I’m running as a standalone cluster on a 16Gb MacBook Pro. The ActivityMonitor is showing no indication of running out of resource CPU or memory, and data is being written to disk at a fairly low rate of around 10-20Mb per second well under the max write rate of 100Mb+ per second. Nothing is showing on the ElasticSearch log save INFO messages about GC occasionally.

The code relevant to the sink is

val httpHosts = new ArrayList[HttpHost]
httpHosts.add(new HttpHost("localhost", 9200, "http"))
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))

val esSinkBuilder = new ElasticsearchSink.Builder[(Int, Long, Double, String)](
httpHosts,
new ElasticsearchSinkFunction[(Int, Long, Double, String)] {

def createIndexRequest(element: (Int, Long, Double, String)): IndexRequest = {
val json = new util.HashMap[String, Any]

json.put("arrayinstance", "beamarraytest")
json.put("bearing", element._1)
json.put("sampleindex", element._2)
json.put("sample", element._3)
json.put("hashstring", element._4)
return Requests.indexRequest()
.index("flink-index")
.`type`("flink-type")
.source(json)
}

def process(element: (Int, Long, Double, String), ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
indexer.add(createIndexRequest(element))
}
}
)

esSinkBuilder.setFailureHandler(
new ActionRequestFailureHandler() {
@throws(classOf[Throwable])
@Override
override def onFailure(action: ActionRequest, failure: Throwable, restStatusCode: Int, indexer: RequestIndexer) {

if (ExceptionUtils.findThrowable(failure, classOf[EsRejectedExecutionException]).isPresent) {
LOG.info("ElasticSearch full queue; re-added document for indexing")
indexer.add(action)
} else if (ExceptionUtils.findThrowable(failure, classOf[ElasticsearchParseException]).isPresent) {
LOG.info("Malformed ElasticSearch document. Document dropped")
} else {
LOG.info("Failed ElasticSearch document. Exception rethrown")
throw failure
}
}
}
)

esSinkBuilder.setBulkFlushMaxActions(10000)

signalFourBuckets.addSink(esSinkBuilder.build)

Is my code at fault, or is the sink just not capable of handling the flow rate? Advice of any form would be very gratefully received.

TIA

Nick Walton