Hello,
I am trying to follow this Flink guide [1] to handle errors in ElasticSearchSink by re-adding the failed messages to the queue. The error scenarios that I got and going to retry are: (i) conflict in UpdateRequest document version and (ii) lost connection to ElasticSearch. These errors are expected to be non-persistent, would be solved by (i) changing the version / (ii) gone after some seconds What I expect is message got retried successfully. What I actually got was: Flink seemed to get stuck on that (first) retry, my flow queued up (backpressure is 1 everywhere), all processing hung. Here is my error handling code: <code> private object MyElasticSearchFailureHandler extends ActionRequestFailureHandler { override def onFailure(actionRequest: ActionRequest, failure: Throwable, restStatusCode: Int, indexer: RequestIndexer): Unit = { if (ExceptionUtils.findThrowableWithMessage(failure, "version_conflict_engine_exception") != Optional.empty()) { actionRequest match { case s: UpdateRequest => LOG.warn(s"Failed inserting record to ElasticSearch due to version conflict (${s.version()}). Retrying") LOG.warn(actionRequest.toString) indexer.add(s.version(s.version() + 1)) case _ => LOG.error("Failed inserting record to ElasticSearch due to version conflict. However, this is not an Update-Request. Don't know why.") LOG.error(actionRequest.toString) throw failure } } else if (restStatusCode == -1 && failure.getMessage.contains("Connection closed")) { LOG.warn(s"Retrying record: ${actionRequest.toString}") actionRequest match { case s: UpdateRequest => indexer.add(s) case s: IndexRequest => indexer.add(s) } } else { LOG.error(s"ELASTICSEARCH FAILED:\n statusCode $restStatusCode\n message: ${failure.getMessage}\n${failure.getStackTrace}") LOG.error(s" DATA:\n ${actionRequest.toString}") throw failure } } } </code> Here is the extract from my task-manager logs: /2019-02-09 04:12:35.676 [I/O dispatcher 25] ERROR o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase - Failed Elasticsearch bulk request: Connection closed 2019-02-09 04:12:35.678 [I/O dispatcher 25] WARN c.n.c......sink.MyElasticSearchSink$ - Retrying record: update {[idx-20190208][_doc][doc_id_1549622700000], doc_as_upsert[true], doc[index {*[null][null][null]*, source[{...}]}], scripted_upsert[false], detect_noop[true]} 2019-02-09 04:12:54.242 [Sink: S3 - Historical (1/4)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=24 (max part counter=26)./ And job-manager logs: /2019-02-09 03:59:37.880 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 23 for job 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 bytes in 307078 ms). 2019-02-09 04:09:30.970 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 24 @ 1549685370776 for job 1a1438ca23387c4ef9a59ff9da6dafa1. 2019-02-09 04:17:00.970 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 24 of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing. 2019-02-09 04:24:31.035 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 25 @ 1549686270776 for job 1a1438ca23387c4ef9a59ff9da6dafa1. 2019-02-09 04:32:01.035 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 25 of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing. 2019-02-09 04:39:30.961 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 26 @ 1549687170776 for job 1a1438ca23387c4ef9a59ff9da6dafa1./ Thanks and best regards, Averell [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, This seems to be the bug that you encountered: https://issues.apache.org/jira/browse/FLINK-11046. Cheers, Gordon On Sat, Feb 9, 2019 at 3:27 PM Averell <[hidden email]> wrote: Hello, |
Thank you Gordon.
That's my exact problem. Will try the fix in 1.7.2 now. Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks for testing it out. Will be great to get your feedback on whether the release candidate for 1.7.2 fixes this for you. On Wed, Feb 13, 2019 at 7:38 PM Averell <[hidden email]> wrote: Thank you Gordon. |
Hi Gordon,
Sorry for a noob question: How can I get the RC 1.7.2 build / code to build? I could not find any branch like that in Github. Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell,
You can get release candidates from the Apache release candidate maven repo. For 1.7.2, I think it’s in: So just edit your pom.xml to add this repo to the <repositories> section. — Ken
-------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Hi Ken,
Thanks for that. But I could not find the changes included in Gordon's mentioned pull request in the repository you gave me (e.g: the new class /ElasticsearchFailureHandlerIndexer/). I have found this folder https://dist.apache.org/repos/dist/dev/flink/flink-1.7.2-rc1/, but it also doesn't have that new class. Maybe Gordon meant 1.7.2 rc2? Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell,
…is the commit where Gordon made his changes for FLINK-11046. The ElasticsearchFailureHandlerIndexer class was removed as part of the commit. — Ken
-------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Hi, [hidden email] I renamed the `ElasticsearchFailureHandlerIndexer` to be `BufferingNoOpRequestIndexer`, which explains why you can't find it. The voting thread for RC#1 of 1.7.2 can be found at [1]. The actual commits which fixes the problem are d9c45af to 2f52227. Cheers, Gordon [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-7-2-release-candidate-1-td26882.html On Thu, Feb 14, 2019 at 9:38 AM Ken Krugler <[hidden email]> wrote:
|
Thank you Gordon and Ken.
My Flink job is now running well with 1.7.2 RC1, with failed ES request retried successfully. One more question I have on this is how to limit the number of retries for different types of errors with ES bulk request. Is there any guideline on that? My temporary solution is to use the version field of each ER request - increase it for every time I retried putting the request into the queue. This works for me until now, but it doesn't look right. Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |