Elasticsearch sink connector timeout

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Elasticsearch sink connector timeout

Kai Fu
Hi team,

We encountered an issue about ES sink connector timeout quite frequently. As checked the ES cluster is far from being loaded(~40% CPU utilization, no query, index rate is also low). We're using ES-7 connector, with 12 data nodes and parallelism of 32. 

The error log is as below, we want to know if there is any way to locate the issue or configure the timeout parameter.


2021-06-05 11:49:10
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)
    at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
    at org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112)
    at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80)
    at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-21 [ACTIVE]
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    ... 1 more

Config:
WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'https://xxx:443',
    'index' = 'xxx',
    'sink.bulk-flush.max-actions' = '10000',
    'sink.bulk-flush.max-size' = '2mb',
    'sink.flush-on-checkpoint' = 'true',
    'connection.max-retry-timeout' = '60s',
    'failure-handler' = 'retry-rejected',
    'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL',
    'sink.bulk-flush.interval' = '2s'
);

--
Best wishes,
- Kai
Reply | Threaded
Open this post in threaded view
|

Re: Elasticsearch sink connector timeout

Kai Fu
With some investigation in the task manager's log, the exception was raised from RetryRejectedExecutionFailureHandler path, the related logs are showing below, not sure why it's that.


5978 2021-06-05 05:31:31,529 INFO org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler [] - Bulk request 1033 has been cancelled.
5979 java.lang.InterruptedException: null
5980 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272]
5981 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272]
5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272]
5983 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
5984 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
5985 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
5986 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
5987 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
5988 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
5989 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
5990 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
5991 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
5992 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1]
5993 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1]
5994 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1]
5995 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1]
5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1]
5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist_2.11-1.13.1.jar:1.13.1]
5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
5999 2021-06-05 05:31:31,530 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: null
6000 java.lang.InterruptedException: null
6001 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272]
6002 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272]
6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272]
6004 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
6005 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
6006 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
6007 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
6008 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
6009 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
6010 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
6011 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
6012 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
6013 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1]
6014 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1]
6015 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1]
6016 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1]
6017 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1]

6030 2021-06-05 05:31:31,633 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: Connection closed unexpectedly
6031 org.apache.flink.elasticsearch7.shaded.org.apache.http.ConnectionClosedException: Connection closed unexpectedly
6032 at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146) [flink-sql-connector-elasticsearch7_2.11- 1.13.1.jar:1.13.1]
6033 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
6034 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
6035 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100) [flink-sql-connector-elasticsearch7_2.11-1. 13.1.jar:1.13.1]
6036 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
6037 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1]
6038 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:283) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
6039 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
6040 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1]
6041 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]


On Sat, Jun 5, 2021 at 12:13 PM Kai Fu <[hidden email]> wrote:
Hi team,

We encountered an issue about ES sink connector timeout quite frequently. As checked the ES cluster is far from being loaded(~40% CPU utilization, no query, index rate is also low). We're using ES-7 connector, with 12 data nodes and parallelism of 32. 

The error log is as below, we want to know if there is any way to locate the issue or configure the timeout parameter.


2021-06-05 11:49:10
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)
    at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
    at org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112)
    at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80)
    at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-21 [ACTIVE]
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    ... 1 more

Config:
WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'https://xxx:443',
    'index' = 'xxx',
    'sink.bulk-flush.max-actions' = '10000',
    'sink.bulk-flush.max-size' = '2mb',
    'sink.flush-on-checkpoint' = 'true',
    'connection.max-retry-timeout' = '60s',
    'failure-handler' = 'retry-rejected',
    'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL',
    'sink.bulk-flush.interval' = '2s'
);

--
Best wishes,
- Kai


--
Best wishes,
- Kai
Reply | Threaded
Open this post in threaded view
|

Re: Elasticsearch sink connector timeout

Yangze Guo
Hi, Kai,

I think the exception should be thrown from
RetryRejectedExecutionFailureHandler as you configure the
'failure-handler' to 'retry-rejected'. It will retry the action that
failed with EsRejectedExecutionException and throw all other failures.

AFAIK, there is no way to configure the connection/socket timeout in
Elasticsearch SQL connector. However, if the root cause is a network
jitter, you may increase the sink.bulk-flush.backoff.delay and the
sink.bulk-flush.backoff.max-retries.


Best,
Yangze Guo

On Sat, Jun 5, 2021 at 2:28 PM Kai Fu <[hidden email]> wrote:

>
> With some investigation in the task manager's log, the exception was raised from RetryRejectedExecutionFailureHandler path, the related logs are showing below, not sure why it's that.
>
>
> 5978 2021-06-05 05:31:31,529 INFO org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler [] - Bulk request 1033 has been cancelled.
> 5979 java.lang.InterruptedException: null
> 5980 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272]
> 5981 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272]
> 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272]
> 5983 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 5984 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5985 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5986 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
> 5987 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5988 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5989 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5990 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5991 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5992 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5993 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5994 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5995 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
> 5999 2021-06-05 05:31:31,530 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: null
> 6000 java.lang.InterruptedException: null
> 6001 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272]
> 6002 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272]
> 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272]
> 6004 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 6005 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6006 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6007 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
> 6008 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6009 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6010 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 6011 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 6012 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 6013 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6014 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6015 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6016 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6017 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1]
>
> 6030 2021-06-05 05:31:31,633 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: Connection closed unexpectedly
> 6031 org.apache.flink.elasticsearch7.shaded.org.apache.http.ConnectionClosedException: Connection closed unexpectedly
> 6032 at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146) [flink-sql-connector-elasticsearch7_2.11- 1.13.1.jar:1.13.1]
> 6033 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
> 6034 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
> 6035 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100) [flink-sql-connector-elasticsearch7_2.11-1. 13.1.jar:1.13.1]
> 6036 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 6037 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6038 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:283) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
> 6039 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6040 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6041 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
>
> On Sat, Jun 5, 2021 at 12:13 PM Kai Fu <[hidden email]> wrote:
>>
>> Hi team,
>>
>> We encountered an issue about ES sink connector timeout quite frequently. As checked the ES cluster is far from being loaded(~40% CPU utilization, no query, index rate is also low). We're using ES-7 connector, with 12 data nodes and parallelism of 32.
>>
>> The error log is as below, we want to know if there is any way to locate the issue or configure the timeout parameter.
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/
>>
>> 2021-06-05 11:49:10
>> java.lang.RuntimeException: An error occurred in ElasticsearchSink.
>>     at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)
>>     at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)
>>     at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)
>>     at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
>>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>     at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
>>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>     at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
>>     at org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112)
>>     at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80)
>>     at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32)
>>     at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>>     at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>>     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>>     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-21 [ACTIVE]
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
>>     ... 1 more
>>
>> Config:
>> WITH (
>>     'connector' = 'elasticsearch-7',
>>     'hosts' = '<a href="https://xxx:443'">https://xxx:443',
>>     'index' = 'xxx',
>>     'sink.bulk-flush.max-actions' = '10000',
>>     'sink.bulk-flush.max-size' = '2mb',
>>     'sink.flush-on-checkpoint' = 'true',
>>     'connection.max-retry-timeout' = '60s',
>>     'failure-handler' = 'retry-rejected',
>>     'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL',
>>     'sink.bulk-flush.interval' = '2s'
>> );
>>
>> --
>> Best wishes,
>> - Kai
>
>
>
> --
> Best wishes,
> - Kai
Reply | Threaded
Open this post in threaded view
|

回复: Elasticsearch sink connector timeout

Jacky Yin 殷传旺
In flink-es connector 6.*, you can set the socket timeout by implementing a customized RestClientFactory。 Here is the code snippet.

    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
        restClientBuilder
                .setRequestConfigCallback(new ElasticSearchRequestConfigCallback())
    }

    class ElasticSearchRequestConfigCallback implements RestClientBuilder.RequestConfigCallback {
        @Override
        public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
            return builder.setSocketTimeout(requestTimeout);
        }
    }


The default socket timeout is 30 seconds which should be ok for most cases. So if you find that the es load is normal, but the response is slow or the number of rejected request is high, probably you should check 
1) if the concurrent count of es bulk requests is too much?
2) if the bulk size is too big? 
3) if too many indexes are included in one bulk request? 

 
BR,
Jacky

发件人: Yangze Guo <[hidden email]>
发送时间: 2021年6月7日 11:41
收件人: Kai Fu <[hidden email]>
抄送: user <[hidden email]>
主题: Re: Elasticsearch sink connector timeout
 
Hi, Kai,

I think the exception should be thrown from
RetryRejectedExecutionFailureHandler as you configure the
'failure-handler' to 'retry-rejected'. It will retry the action that
failed with EsRejectedExecutionException and throw all other failures.

AFAIK, there is no way to configure the connection/socket timeout in
Elasticsearch SQL connector. However, if the root cause is a network
jitter, you may increase the sink.bulk-flush.backoff.delay and the
sink.bulk-flush.backoff.max-retries.


Best,
Yangze Guo

On Sat, Jun 5, 2021 at 2:28 PM Kai Fu <[hidden email]> wrote:
>
> With some investigation in the task manager's log, the exception was raised from RetryRejectedExecutionFailureHandler path, the related logs are showing below, not sure why it's that.
>
>
> 5978 2021-06-05 05:31:31,529 INFO org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler [] - Bulk request 1033 has been cancelled.
> 5979 java.lang.InterruptedException: null
> 5980 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272]
> 5981 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272]
> 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272]
> 5983 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 5984 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5985 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5986 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
> 5987 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5988 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5989 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5990 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5991 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5992 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5993 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5994 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5995 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
> 5999 2021-06-05 05:31:31,530 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: null
> 6000 java.lang.InterruptedException: null
> 6001 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272]
> 6002 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272]
> 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272]
> 6004 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 6005 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6006 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6007 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
> 6008 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6009 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6010 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 6011 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 6012 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 6013 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6014 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6015 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6016 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6017 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1]
>
> 6030 2021-06-05 05:31:31,633 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: Connection closed unexpectedly
> 6031 org.apache.flink.elasticsearch7.shaded.org.apache.http.ConnectionClosedException: Connection closed unexpectedly
> 6032 at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146) [flink-sql-connector-elasticsearch7_2.11- 1.13.1.jar:1.13.1]
> 6033 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
> 6034 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
> 6035 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100) [flink-sql-connector-elasticsearch7_2.11-1. 13.1.jar:1.13.1]
> 6036 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 6037 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6038 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:283) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
> 6039 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6040 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6041 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
>
> On Sat, Jun 5, 2021 at 12:13 PM Kai Fu <[hidden email]> wrote:
>>
>> Hi team,
>>
>> We encountered an issue about ES sink connector timeout quite frequently. As checked the ES cluster is far from being loaded(~40% CPU utilization, no query, index rate is also low). We're using ES-7 connector, with 12 data nodes and parallelism of 32.
>>
>> The error log is as below, we want to know if there is any way to locate the issue or configure the timeout parameter.
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/
>>
>> 2021-06-05 11:49:10
>> java.lang.RuntimeException: An error occurred in ElasticsearchSink.
>>     at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)
>>     at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)
>>     at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)
>>     at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
>>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>     at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
>>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>     at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
>>     at org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112)
>>     at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80)
>>     at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32)
>>     at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>>     at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>>     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>>     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-21 [ACTIVE]
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>>     at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
>>     ... 1 more
>>
>> Config:
>> WITH (
>>     'connector' = 'elasticsearch-7',
>>     'hosts' = 'https://xxx:443',
>>     'index' = 'xxx',
>>     'sink.bulk-flush.max-actions' = '10000',
>>     'sink.bulk-flush.max-size' = '2mb',
>>     'sink.flush-on-checkpoint' = 'true',
>>     'connection.max-retry-timeout' = '60s',
>>     'failure-handler' = 'retry-rejected',
>>     'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL',
>>     'sink.bulk-flush.interval' = '2s'
>> );
>>
>> --
>> Best wishes,
>> - Kai
>
>
>
> --
> Best wishes,
> - Kai