Hi team,
We encountered an issue about ES sink connector timeout quite frequently. As checked the ES cluster is far from being loaded(~40% CPU utilization, no query, index rate is also low). We're using ES-7 connector, with 12 data nodes and parallelism of 32. The error log is as below, we want to know if there is any way to locate the issue or configure the timeout parameter. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/ 2021-06-05 11:49:10 java.lang.RuntimeException: An error occurred in ElasticsearchSink. at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329) at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) at org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112) at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80) at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-21 [ACTIVE] at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ... 1 more Config: WITH ('connector' = 'elasticsearch-7', 'hosts' = 'https://xxx:443', 'index' = 'xxx', 'sink.bulk-flush.max-actions' = '10000', 'sink.bulk-flush.max-size' = '2mb', 'sink.flush-on-checkpoint' = 'true', 'connection.max-retry-timeout' = '60s', 'failure-handler' = 'retry-rejected', 'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL', 'sink.bulk-flush.interval' = '2s' ); Best wishes, - Kai |
With some investigation in the task manager's log, the exception was raised from RetryRejectedExecutionFailureHandler path, the related logs are showing below, not sure why it's that. 5978 2021-06-05 05:31:31,529 INFO org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler [] - Bulk request 1033 has been cancelled. 5979 java.lang.InterruptedException: null 5980 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272] 5981 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272] 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272] 5983 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 5984 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5985 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5986 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] 5987 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5988 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5989 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 5990 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 5991 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 5992 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1] 5993 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1] 5994 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1] 5995 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1] 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1] 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist_2.11-1.13.1.jar:1.13.1] 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] 5999 2021-06-05 05:31:31,530 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: null 6000 java.lang.InterruptedException: null 6001 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272] 6002 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272] 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272] 6004 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 6005 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6006 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6007 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] 6008 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6009 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6010 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 6011 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 6012 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 6013 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1] 6014 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1] 6015 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1] 6016 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1] 6017 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1] 6030 2021-06-05 05:31:31,633 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: Connection closed unexpectedly 6031 org.apache.flink.elasticsearch7.shaded.org.apache.http.ConnectionClosedException: Connection closed unexpectedly 6032 at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146) [flink-sql-connector-elasticsearch7_2.11- 1.13.1.jar:1.13.1] 6033 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] 6034 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] 6035 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100) [flink-sql-connector-elasticsearch7_2.11-1. 13.1.jar:1.13.1] 6036 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 6037 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] 6038 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:283) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] 6039 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6040 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] 6041 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] On Sat, Jun 5, 2021 at 12:13 PM Kai Fu <[hidden email]> wrote:
Best wishes, - Kai |
Hi, Kai,
I think the exception should be thrown from RetryRejectedExecutionFailureHandler as you configure the 'failure-handler' to 'retry-rejected'. It will retry the action that failed with EsRejectedExecutionException and throw all other failures. AFAIK, there is no way to configure the connection/socket timeout in Elasticsearch SQL connector. However, if the root cause is a network jitter, you may increase the sink.bulk-flush.backoff.delay and the sink.bulk-flush.backoff.max-retries. Best, Yangze Guo On Sat, Jun 5, 2021 at 2:28 PM Kai Fu <[hidden email]> wrote: > > With some investigation in the task manager's log, the exception was raised from RetryRejectedExecutionFailureHandler path, the related logs are showing below, not sure why it's that. > > > 5978 2021-06-05 05:31:31,529 INFO org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler [] - Bulk request 1033 has been cancelled. > 5979 java.lang.InterruptedException: null > 5980 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272] > 5981 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272] > 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272] > 5983 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] > 5984 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5985 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5986 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] > 5987 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5988 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5989 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 5990 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 5991 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 5992 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1] > 5993 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1] > 5994 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1] > 5995 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1] > 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1] > 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist_2.11-1.13.1.jar:1.13.1] > 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] > 5999 2021-06-05 05:31:31,530 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: null > 6000 java.lang.InterruptedException: null > 6001 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272] > 6002 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272] > 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272] > 6004 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] > 6005 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6006 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6007 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] > 6008 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6009 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6010 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 6011 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 6012 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 6013 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1] > 6014 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1] > 6015 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1] > 6016 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1] > 6017 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1] > > 6030 2021-06-05 05:31:31,633 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: Connection closed unexpectedly > 6031 org.apache.flink.elasticsearch7.shaded.org.apache.http.ConnectionClosedException: Connection closed unexpectedly > 6032 at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146) [flink-sql-connector-elasticsearch7_2.11- 1.13.1.jar:1.13.1] > 6033 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] > 6034 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] > 6035 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100) [flink-sql-connector-elasticsearch7_2.11-1. 13.1.jar:1.13.1] > 6036 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] > 6037 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6038 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:283) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] > 6039 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6040 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6041 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] > > On Sat, Jun 5, 2021 at 12:13 PM Kai Fu <[hidden email]> wrote: >> >> Hi team, >> >> We encountered an issue about ES sink connector timeout quite frequently. As checked the ES cluster is far from being loaded(~40% CPU utilization, no query, index rate is also low). We're using ES-7 connector, with 12 data nodes and parallelism of 32. >> >> The error log is as below, we want to know if there is any way to locate the issue or configure the timeout parameter. >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/ >> >> 2021-06-05 11:49:10 >> java.lang.RuntimeException: An error occurred in ElasticsearchSink. >> at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427) >> at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432) >> at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329) >> at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) >> at org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112) >> at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80) >> at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32) >> at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) >> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) >> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) >> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-21 [ACTIVE] >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) >> ... 1 more >> >> Config: >> WITH ( >> 'connector' = 'elasticsearch-7', >> 'hosts' = '<a href="https://xxx:443'">https://xxx:443', >> 'index' = 'xxx', >> 'sink.bulk-flush.max-actions' = '10000', >> 'sink.bulk-flush.max-size' = '2mb', >> 'sink.flush-on-checkpoint' = 'true', >> 'connection.max-retry-timeout' = '60s', >> 'failure-handler' = 'retry-rejected', >> 'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL', >> 'sink.bulk-flush.interval' = '2s' >> ); >> >> -- >> Best wishes, >> - Kai > > > > -- > Best wishes, > - Kai |
In flink-es connector 6.*, you can set the socket timeout by implementing a customized RestClientFactory。 Here is the code snippet.
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
restClientBuilder
.setRequestConfigCallback(new ElasticSearchRequestConfigCallback())
}
class ElasticSearchRequestConfigCallback implements RestClientBuilder.RequestConfigCallback {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
return builder.setSocketTimeout(requestTimeout);
}
}
The default socket timeout is 30 seconds which should be ok for most cases. So if you find that the es load is normal, but the response is slow or the number of rejected request is high, probably you should check
1) if the concurrent count of es bulk requests is too much?
2) if the bulk size is too big?
3) if too many indexes are included in one bulk request?
BR,
Jacky
发件人: Yangze Guo <[hidden email]>
发送时间: 2021年6月7日 11:41 收件人: Kai Fu <[hidden email]> 抄送: user <[hidden email]> 主题: Re: Elasticsearch sink connector timeout Hi, Kai,
I think the exception should be thrown from RetryRejectedExecutionFailureHandler as you configure the 'failure-handler' to 'retry-rejected'. It will retry the action that failed with EsRejectedExecutionException and throw all other failures. AFAIK, there is no way to configure the connection/socket timeout in Elasticsearch SQL connector. However, if the root cause is a network jitter, you may increase the sink.bulk-flush.backoff.delay and the sink.bulk-flush.backoff.max-retries. Best, Yangze Guo On Sat, Jun 5, 2021 at 2:28 PM Kai Fu <[hidden email]> wrote: > > With some investigation in the task manager's log, the exception was raised from RetryRejectedExecutionFailureHandler path, the related logs are showing below, not sure why it's that. > > > 5978 2021-06-05 05:31:31,529 INFO org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler [] - Bulk request 1033 has been cancelled. > 5979 java.lang.InterruptedException: null > 5980 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272] > 5981 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272] > 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272] > 5983 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] > 5984 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5985 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5986 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] > 5987 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5988 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5989 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 5990 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 5991 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 5992 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1] > 5993 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1] > 5994 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1] > 5995 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1] > 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1] > 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist_2.11-1.13.1.jar:1.13.1] > 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] > 5999 2021-06-05 05:31:31,530 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: null > 6000 java.lang.InterruptedException: null > 6001 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272] > 6002 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272] > 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272] > 6004 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] > 6005 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6006 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6007 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] > 6008 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6009 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6010 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 6011 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 6012 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 6013 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1] > 6014 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1] > 6015 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1] > 6016 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1] > 6017 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1] > > 6030 2021-06-05 05:31:31,633 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: Connection closed unexpectedly > 6031 org.apache.flink.elasticsearch7.shaded.org.apache.http.ConnectionClosedException: Connection closed unexpectedly > 6032 at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146) [flink-sql-connector-elasticsearch7_2.11- 1.13.1.jar:1.13.1] > 6033 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] > 6034 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] > 6035 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100) [flink-sql-connector-elasticsearch7_2.11-1. 13.1.jar:1.13.1] > 6036 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] > 6037 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6038 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:283) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] > 6039 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6040 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6041 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] > > On Sat, Jun 5, 2021 at 12:13 PM Kai Fu <[hidden email]> wrote: >> >> Hi team, >> >> We encountered an issue about ES sink connector timeout quite frequently. As checked the ES cluster is far from being loaded(~40% CPU utilization, no query, index rate is also low). We're using ES-7 connector, with 12 data nodes and parallelism of 32. >> >> The error log is as below, we want to know if there is any way to locate the issue or configure the timeout parameter. >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/ >> >> 2021-06-05 11:49:10 >> java.lang.RuntimeException: An error occurred in ElasticsearchSink. >> at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427) >> at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432) >> at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329) >> at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) >> at org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112) >> at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80) >> at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32) >> at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) >> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) >> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) >> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-21 [ACTIVE] >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) >> at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) >> ... 1 more >> >> Config: >> WITH ( >> 'connector' = 'elasticsearch-7', >> 'hosts' = 'https://xxx:443', >> 'index' = 'xxx', >> 'sink.bulk-flush.max-actions' = '10000', >> 'sink.bulk-flush.max-size' = '2mb', >> 'sink.flush-on-checkpoint' = 'true', >> 'connection.max-retry-timeout' = '60s', >> 'failure-handler' = 'retry-rejected', >> 'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL', >> 'sink.bulk-flush.interval' = '2s' >> ); >> >> -- >> Best wishes, >> - Kai > > > > -- > Best wishes, > - Kai |
Free forum by Nabble | Edit this page |