Flink Elasticsearch upsert document in ES

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Elasticsearch upsert document in ES

ApoorvK
Team,
Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost .

I am trying for a update or insert in case document not found or do not
insert if document is already there.


I have tried by providing opType to elasticsearch builder, I am getting an
error message "document already exists" on my console, but it still updates
the value in elasticsearch

val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
  .index(parameter.get("esIndexName"))
  .`type`(parameter.get("esIndexType"))
  .id(record.getApi_key + "_" + record.getOrder_id)
  .source(jsonString, XContentType.JSON)
    .opType(OpType.CREATE)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Elasticsearch upsert document in ES

ORIOL LOPEZ SANCHEZ
When building the request, you should build an UpdateRequest, like the following snippet:

import org.elasticsearch.action.update.UpdateRequest
import org.elasticsearch.common.xcontent.XContentType

val doc: String = ???
val targetIndex: String = ???
val indexType: Option[String] = ???

new UpdateRequest()
  .index(targetIndex)
  .`type`(indexType.getOrElse("_doc"))
  .id(id)
  .upsert(doc, XContentType.JSON)
  .doc(doc, XContentType.JSON)
  .docAsUpsert(true)

I'm not entirely sure if you need both "doc" and "upsert" fields, as I think this depends on the Elasticsearch you're using.


De: ApoorvK <[hidden email]>
Enviat el: dilluns, 10 de febrer de 2020 15:33
Per a: [hidden email] <[hidden email]>
Tema: Flink Elasticsearch upsert document in ES
 
Team,
Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost .

I am trying for a update or insert in case document not found or do not
insert if document is already there.


I have tried by providing opType to elasticsearch builder, I am getting an
error message "document already exists" on my console, but it still updates
the value in elasticsearch

val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
  .index(parameter.get("esIndexName"))
  .`type`(parameter.get("esIndexType"))
  .id(record.getApi_key + "_" + record.getOrder_id)
  .source(jsonString, XContentType.JSON)
    .opType(OpType.CREATE)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição