Can I make an Elasticsearch Sink effectively exactly once?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Can I make an Elasticsearch Sink effectively exactly once?

Stephen Connolly
From how I understand it:


the Flink Elasticsearch Sink guarantees at-least-once delivery of action requests to Elasticsearch clusters. It does so by waiting for all pending action requests in the BulkProcessor at the time of checkpoints. This effectively assures that all requests before the checkpoint was triggered have been successfully acknowledged by Elasticsearch, before proceeding to process more records sent to the sink.

So I am thinking:

  • If I put a .map(json -> json.set("_id", ElasticsearchId.generate()) in front of the Elasticsearch sink
  • If I have a ActionRequestFailureHandler that drops any ID conflicts on the floor
Would this give me exactly once output to Elasticsearch as the BulkProcessor's checkpoint would include the "_id" and thus in the event of a recovery the duplicates would be detected.

Or is there some additional concern I need to be aware of?

Thanks

-stephenc
Reply | Threaded
Open this post in threaded view
|

Re: Can I make an Elasticsearch Sink effectively exactly once?

Dawid Wysakowicz-2

Hi,

The procedure you described will not give you exactly once semantics. What the cited excerpt means is that a checkpoint will not be considered finished until pending requests are acknowledged. It does not mean that those requests are stored on the flink side. That said if an error occurs before those requests are acknowledged. The job will be recovered from previous successful checkpoint. Nevertheless some of the requests that "belong" to current checkpoint might have been sent to ES at this time, that's where the "at-least-once" delivery comes from.

If you do have a deterministic way of generating ElasticseachId this semantic should be enough for you though. Any duplicates(by the id) should be updated on the ES side.

Best,

Dawid

On 21/02/2019 14:26, Stephen Connolly wrote:
From how I understand it:


the Flink Elasticsearch Sink guarantees at-least-once delivery of action requests to Elasticsearch clusters. It does so by waiting for all pending action requests in the BulkProcessor at the time of checkpoints. This effectively assures that all requests before the checkpoint was triggered have been successfully acknowledged by Elasticsearch, before proceeding to process more records sent to the sink.

So I am thinking:

  • If I put a .map(json -> json.set("_id", ElasticsearchId.generate()) in front of the Elasticsearch sink
  • If I have a ActionRequestFailureHandler that drops any ID conflicts on the floor
Would this give me exactly once output to Elasticsearch as the BulkProcessor's checkpoint would include the "_id" and thus in the event of a recovery the duplicates would be detected.

Or is there some additional concern I need to be aware of?

Thanks

-stephenc

signature.asc (849 bytes) Download Attachment