Team,
Presently I have added elasticsearch as a sink to a stream and inserting the json data, the problem is when I restore the application in case of crash it reprocess the data in between (meanwhile a backend application updates the document in ES) and flink reinsert the document in ES and all update to ES are lost . I am trying for a update or insert in case document not found or do not insert if document is already there. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi ApoorvK, Elasticsearch supports "create" mode while indexing. By default indexing will overwrite documents with a the same ID, but you can tell ES to refuse overwriting. See op_type in https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-query-params . Looking at the Elasticsearch Sink, it doesn't seem like it's implemented currently, but it should be relatively easy to add. On Mon, Feb 10, 2020 at 9:26 AM ApoorvK <[hidden email]> wrote: Team, -- |
I have tried by providing opType to elasticsearch builder, I am getting an error message "document already exists" on my console, but it still updates the value in elasticsearch val jsonString = write(record) On Mon, Feb 10, 2020 at 1:42 PM Itamar Syn-Hershko <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |