HI,
I’m running ElasticSearch as a sink for a batch file processing a CSV file of 6.2 million rows, with each row being 181 numeric values. It quite happily processes a small example of around 2,000 rows, running each column through a single parallel pipeline, keyed by column number. However, once I scale up to the full data size, with parallelism set higher than one typically eight, after a while ElasticSearch fails as below 2019-10-17 09:36:09,550 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase - Failed Elasticsearch bulk request: request retries exceeded max retry timeout [30000] java.io.IOException: request retries exceeded max retry timeout [30000] at org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411) at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398) at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137) at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419) at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) at java.lang.Thread.run(Thread.java:745) 2019-10-17 09:36:09,576 INFO org.example.Job$ - Failed ElasticSearch document. Exception rethrown 2019-10-17 09:36:09,624 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator. java.lang.RuntimeException: An error occurred in ElasticsearchSink. at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:343) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:380) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: request retries exceeded max retry timeout [30000] at org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411) at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398) at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137) at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419) at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) ... 1 more 2019-10-17 09:36:09,629 INFO org.apache.flink.runtime.taskmanager.Task - Window(GlobalWindows(), CountTrigger, CountEvictor, ScalaProcessWindowFunctionWrapper) -> Map -> Map -> Map -> Sink: Unnamed (2/8) (cb74c5d1c9323000fae9488d7d256468) switched from RUNNING to FAILED. java.lang.RuntimeException: An error occurred in ElasticsearchSink. at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.example.signalRecognitionWindowFunction.process(signalRecognitionWindowFunction.scala:42) at org.example.signalRecognitionWindowFunction.process(signalRecognitionWindowFunction.scala:8) at org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:359) at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:218) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: request retries exceeded max retry timeout [30000] at org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411) at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398) at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137) at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419) at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) ... 1 more I’m running Flink 1.8.1 and version 1.9.0 of the ES connector. I’m running as a standalone cluster on a 16Gb MacBook Pro. The ActivityMonitor is showing no indication of running out of resource CPU or memory, and data is being written to disk at a fairly low rate of around 10-20Mb per second well under the max write rate of 100Mb+ per second. Nothing is showing on the ElasticSearch log save INFO messages about GC occasionally. The code relevant to the sink is val httpHosts = new ArrayList[HttpHost]
Is my code at fault, or is the sink just not capable of handling the flow rate? Advice of any form would be very gratefully received. TIA Nick Walton |
Free forum by Nabble | Edit this page |