Flink Elasticsearch upsert document in ES

Flink Elasticsearch upsert document in ES

Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost .

I am trying for a update or insert in case document not found or do not
insert if document is already there.

I have tried by providing opType to elasticsearch builder, I am getting an
error message "document already exists" on my console, but it still updates
the value in elasticsearch

val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
  .id(record.getApi_key + "_" + record.getOrder_id)
  .source(jsonString, XContentType.JSON)

Re: Flink Elasticsearch upsert document in ES

When building the request, you should build an UpdateRequest, like the following snippet:

import org.elasticsearch.action.update.UpdateRequest
import org.elasticsearch.common.xcontent.XContentType

val doc: String = ???
val targetIndex: String = ???
val indexType: Option[String] = ???

new UpdateRequest()
  .upsert(doc, XContentType.JSON)
  .doc(doc, XContentType.JSON)

I'm not entirely sure if you need both "doc" and "upsert" fields, as I think this depends on the Elasticsearch you're using.

