ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Averell
Hello,

I am trying to follow this Flink guide [1] to handle errors in
ElasticSearchSink by re-adding the failed messages to the queue.
The error scenarios that I got and going to retry are: (i) conflict in
UpdateRequest document version and (ii) lost connection to ElasticSearch.
These errors are expected to be non-persistent, would be solved by (i)
changing the version / (ii) gone after some seconds
What I expect is message got retried successfully.
What I actually got was: Flink seemed to get stuck on that (first) retry, my
flow queued up (backpressure is 1 everywhere), all processing hung.

Here is my error handling code:

<code>
        private object MyElasticSearchFailureHandler extends
ActionRequestFailureHandler {
                override def onFailure(actionRequest: ActionRequest, failure: Throwable,
restStatusCode: Int, indexer: RequestIndexer): Unit = {
                        if (ExceptionUtils.findThrowableWithMessage(failure,
"version_conflict_engine_exception") != Optional.empty()) {
                                actionRequest match {
                                        case s: UpdateRequest =>
                                                LOG.warn(s"Failed inserting record to ElasticSearch due to version
conflict (${s.version()}). Retrying")
                                                LOG.warn(actionRequest.toString)
                                                indexer.add(s.version(s.version() + 1))
                                        case _ =>
                                                LOG.error("Failed inserting record to ElasticSearch due to version
conflict. However, this is not an Update-Request. Don't know why.")
                                                LOG.error(actionRequest.toString)
                                                throw failure
                                }
                        } else if (restStatusCode == -1 &&
failure.getMessage.contains("Connection closed")) {
                                LOG.warn(s"Retrying record: ${actionRequest.toString}")
                                actionRequest match {
                                        case s: UpdateRequest => indexer.add(s)
                                        case s: IndexRequest => indexer.add(s)
                                }
                        } else {
                                LOG.error(s"ELASTICSEARCH FAILED:\n    statusCode $restStatusCode\n  
message: ${failure.getMessage}\n${failure.getStackTrace}")
                                LOG.error(s"    DATA:\n    ${actionRequest.toString}")
                                throw failure
                        }
                }
        }
</code>

Here is the extract from my task-manager logs:

/2019-02-09 04:12:35.676 [I/O dispatcher 25] ERROR
o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
Elasticsearch bulk request: Connection closed
2019-02-09 04:12:35.678 [I/O dispatcher 25] WARN
c.n.c......sink.MyElasticSearchSink$  - Retrying record: update
{[idx-20190208][_doc][doc_id_1549622700000], doc_as_upsert[true], doc[index
{*[null][null][null]*, source[{...}]}], scripted_upsert[false],
detect_noop[true]}
2019-02-09 04:12:54.242 [Sink: S3 - Historical (1/4)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=24 (max part counter=26)./

And job-manager logs:
/2019-02-09 03:59:37.880 [flink-akka.actor.default-dispatcher-4] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
checkpoint 23 for job 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 bytes in
307078 ms).
2019-02-09 04:09:30.970 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 24 @ 1549685370776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
2019-02-09 04:17:00.970 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 24
of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
2019-02-09 04:24:31.035 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 25 @ 1549686270776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
2019-02-09 04:32:01.035 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 25
of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
2019-02-09 04:39:30.961 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 26 @ 1549687170776 for job 1a1438ca23387c4ef9a59ff9da6dafa1./

Thanks and best regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests>  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Tzu-Li (Gordon) Tai
Hi Averell,

This seems to be the bug that you encountered: https://issues.apache.org/jira/browse/FLINK-11046.

Cheers,
Gordon

On Sat, Feb 9, 2019 at 3:27 PM Averell <[hidden email]> wrote:
Hello,

I am trying to follow this Flink guide [1] to handle errors in
ElasticSearchSink by re-adding the failed messages to the queue.
The error scenarios that I got and going to retry are: (i) conflict in
UpdateRequest document version and (ii) lost connection to ElasticSearch.
These errors are expected to be non-persistent, would be solved by (i)
changing the version / (ii) gone after some seconds
What I expect is message got retried successfully.
What I actually got was: Flink seemed to get stuck on that (first) retry, my
flow queued up (backpressure is 1 everywhere), all processing hung.

Here is my error handling code:

<code>
        private object MyElasticSearchFailureHandler extends
ActionRequestFailureHandler {
                override def onFailure(actionRequest: ActionRequest, failure: Throwable,
restStatusCode: Int, indexer: RequestIndexer): Unit = {
                        if (ExceptionUtils.findThrowableWithMessage(failure,
"version_conflict_engine_exception") != Optional.empty()) {
                                actionRequest match {
                                        case s: UpdateRequest =>
                                                LOG.warn(s"Failed inserting record to ElasticSearch due to version
conflict (${s.version()}). Retrying")
                                                LOG.warn(actionRequest.toString)
                                                indexer.add(s.version(s.version() + 1))
                                        case _ =>
                                                LOG.error("Failed inserting record to ElasticSearch due to version
conflict. However, this is not an Update-Request. Don't know why.")
                                                LOG.error(actionRequest.toString)
                                                throw failure
                                }
                        } else if (restStatusCode == -1 &&
failure.getMessage.contains("Connection closed")) {
                                LOG.warn(s"Retrying record: ${actionRequest.toString}")
                                actionRequest match {
                                        case s: UpdateRequest => indexer.add(s)
                                        case s: IndexRequest => indexer.add(s)
                                }
                        } else {
                                LOG.error(s"ELASTICSEARCH FAILED:\n    statusCode $restStatusCode\n   
message: ${failure.getMessage}\n${failure.getStackTrace}")
                                LOG.error(s"    DATA:\n    ${actionRequest.toString}")
                                throw failure
                        }
                }
        }
</code>

Here is the extract from my task-manager logs:

/2019-02-09 04:12:35.676 [I/O dispatcher 25] ERROR
o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
Elasticsearch bulk request: Connection closed
2019-02-09 04:12:35.678 [I/O dispatcher 25] WARN
c.n.c......sink.MyElasticSearchSink$  - Retrying record: update
{[idx-20190208][_doc][doc_id_1549622700000], doc_as_upsert[true], doc[index
{*[null][null][null]*, source[{...}]}], scripted_upsert[false],
detect_noop[true]}
2019-02-09 04:12:54.242 [Sink: S3 - Historical (1/4)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=24 (max part counter=26)./

And job-manager logs:
/2019-02-09 03:59:37.880 [flink-akka.actor.default-dispatcher-4] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
checkpoint 23 for job 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 bytes in
307078 ms).
2019-02-09 04:09:30.970 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 24 @ 1549685370776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
2019-02-09 04:17:00.970 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 24
of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
2019-02-09 04:24:31.035 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 25 @ 1549686270776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
2019-02-09 04:32:01.035 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 25
of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
2019-02-09 04:39:30.961 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 26 @ 1549687170776 for job 1a1438ca23387c4ef9a59ff9da6dafa1./

Thanks and best regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Averell
Thank you Gordon.

That's my exact  problem. Will try the fix in 1.7.2 now.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Tzu-Li (Gordon) Tai
Thanks for testing it out.
Will be great to get your feedback on whether the release candidate for 1.7.2 fixes this for you.

On Wed, Feb 13, 2019 at 7:38 PM Averell <[hidden email]> wrote:
Thank you Gordon.

That's my exact  problem. Will try the fix in 1.7.2 now.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Averell
Hi Gordon,

Sorry for a noob question: How can I get the RC 1.7.2 build / code to build?
I could not find any branch like that in Github.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Ken Krugler
Hi Averell,

You can get release candidates from the Apache release candidate maven repo. For 1.7.2, I think it’s in:


So just edit your pom.xml to add this repo to the <repositories> section.

— Ken

On Feb 13, 2019, at 4:20 PM, Averell <[hidden email]> wrote:

Hi Gordon,

Sorry for a noob question: How can I get the RC 1.7.2 build / code to build?
I could not find any branch like that in Github.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Averell
Hi Ken,

Thanks for that. But I could not find the changes included in Gordon's
mentioned pull request in the repository you gave me (e.g: the new class
/ElasticsearchFailureHandlerIndexer/).
I have found this folder
https://dist.apache.org/repos/dist/dev/flink/flink-1.7.2-rc1/, but it also
doesn't have that new class.
Maybe Gordon meant 1.7.2 rc2?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Ken Krugler
Hi Averell,


…is the commit where Gordon made his changes for FLINK-11046.

The ElasticsearchFailureHandlerIndexer class was removed as part of the commit.

— Ken


On Feb 13, 2019, at 4:46 PM, Averell <[hidden email]> wrote:

Hi Ken,

Thanks for that. But I could not find the changes included in Gordon's
mentioned pull request in the repository you gave me (e.g: the new class
/ElasticsearchFailureHandlerIndexer/).
I have found this folder
https://dist.apache.org/repos/dist/dev/flink/flink-1.7.2-rc1/, but it also
doesn't have that new class.
Maybe Gordon meant 1.7.2 rc2?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Tzu-Li (Gordon) Tai
Hi,

[hidden email] I renamed the `ElasticsearchFailureHandlerIndexer` to be `BufferingNoOpRequestIndexer`, which explains why you can't find it.

The voting thread for RC#1 of 1.7.2 can be found at [1].
The actual commits which fixes the problem are d9c45af to 2f52227.

Cheers,
Gordon

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-7-2-release-candidate-1-td26882.html

On Thu, Feb 14, 2019 at 9:38 AM Ken Krugler <[hidden email]> wrote:
Hi Averell,


…is the commit where Gordon made his changes for FLINK-11046.

The ElasticsearchFailureHandlerIndexer class was removed as part of the commit.

— Ken


On Feb 13, 2019, at 4:46 PM, Averell <[hidden email]> wrote:

Hi Ken,

Thanks for that. But I could not find the changes included in Gordon's
mentioned pull request in the repository you gave me (e.g: the new class
/ElasticsearchFailureHandlerIndexer/).
I have found this folder
https://dist.apache.org/repos/dist/dev/flink/flink-1.7.2-rc1/, but it also
doesn't have that new class.
Maybe Gordon meant 1.7.2 rc2?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Averell
Thank you Gordon and Ken.

My Flink job is now running well with 1.7.2 RC1, with failed ES request
retried successfully.

One more question I have on this is how to limit the number of retries for
different types of errors with ES bulk request. Is there any guideline on
that?

My temporary solution is to use the version field of each ER request -
increase it for every time I retried putting the request into the queue.
This works for me until now, but it doesn't look right.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/