Flink Elasticsearch upsert document in ES

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Elasticsearch upsert document in ES

ApoorvK
Team,
Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost .

I am trying for a update or insert in case document not found or do not
insert if document is already there.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Elasticsearch upsert document in ES

Itamar Syn-Hershko
Hi ApoorvK,

Elasticsearch supports "create" mode while indexing. By default indexing will overwrite documents with a the same ID, but you can tell ES to refuse overwriting. See op_type in https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-query-params .

Looking at the Elasticsearch Sink, it doesn't seem like it's implemented currently, but it should be relatively easy to add.

On Mon, Feb 10, 2020 at 9:26 AM ApoorvK <[hidden email]> wrote:
Team,
Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost .

I am trying for a update or insert in case document not found or do not
insert if document is already there.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

logo
Itamar Syn-Hershko
CTO, Founder

[hidden email]
https://bigdataboutique.com
      
Reply | Threaded
Open this post in threaded view
|

Re: Flink Elasticsearch upsert document in ES

ApoorvK
I have tried by providing opType to elasticsearch builder, I am getting an error message "document already exists" on my console, but it still updates the value in elasticsearch

val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
.index(parameter.get("esIndexName"))
.`type`(parameter.get("esIndexType"))
.id(record.getApi_key + "_" + record.getOrder_id)
.source(jsonString, XContentType.JSON)
.opType(OpType.CREATE)

On Mon, Feb 10, 2020 at 1:42 PM Itamar Syn-Hershko <[hidden email]> wrote:
Hi ApoorvK,

Elasticsearch supports "create" mode while indexing. By default indexing will overwrite documents with a the same ID, but you can tell ES to refuse overwriting. See op_type in https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-query-params .

Looking at the Elasticsearch Sink, it doesn't seem like it's implemented currently, but it should be relatively easy to add.

On Mon, Feb 10, 2020 at 9:26 AM ApoorvK <[hidden email]> wrote:
Team,
Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost .

I am trying for a update or insert in case document not found or do not
insert if document is already there.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

logo
Itamar Syn-Hershko
CTO, Founder

[hidden email]
https://bigdataboutique.com