Upserts with Flink-elasticsearch

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Upserts with Flink-elasticsearch

Madhukar Thota
Is it possible to do Upsert with existing flink-elasticsearch connector today?
Reply | Threaded
Open this post in threaded view
|

Re: Upserts with Flink-elasticsearch

Zach Cox
Hi Madhukar - with the current Elasticsearch sink in Flink 1.0.0 [1], I don't think an upsert is possible, since IndexRequestBuilder can only return an IndexRequest.

In Flink 1.1, the Elasticsearch 2.x sink [2] provides a RequestIndexer [3] that you can pass an UpdateRequest to do an upsert.

Thanks,
Zach


On Mon, Mar 28, 2016 at 2:18 PM Madhukar Thota <[hidden email]> wrote:
Is it possible to do Upsert with existing flink-elasticsearch connector today?
Reply | Threaded
Open this post in threaded view
|

Re: Upserts with Flink-elasticsearch

Suneel Marthi
Would it be useful to modify the existing Elasticsearch 1x sink to be able to handle Upserts ?


On Mon, Mar 28, 2016 at 5:32 PM, Zach Cox <[hidden email]> wrote:
Hi Madhukar - with the current Elasticsearch sink in Flink 1.0.0 [1], I don't think an upsert is possible, since IndexRequestBuilder can only return an IndexRequest.

In Flink 1.1, the Elasticsearch 2.x sink [2] provides a RequestIndexer [3] that you can pass an UpdateRequest to do an upsert.

Thanks,
Zach


On Mon, Mar 28, 2016 at 2:18 PM Madhukar Thota <[hidden email]> wrote:
Is it possible to do Upsert with existing flink-elasticsearch connector today?

Reply | Threaded
Open this post in threaded view
|

Re: Upserts with Flink-elasticsearch

Hung
In reply to this post by Zach Cox
Hi Zach,

For using upsert in ES2, I guess it looks like as follows? However I cannot find which method in Request returns UpdateRequest while Requests.indexRequest() returns IndexRequest. Can I ask did you know it?

public static UpdateRequest updateIndexRequest(String element) {
        Map<String, Object> json = new HashMap<>();
        json.put("data", element);        

       // Wrong call
        return Requests.updateRequest
                .index(index)
                .type(type)
                .id(element)
                .source(json);
    }

Best,

Sendoh
Reply | Threaded
Open this post in threaded view
|

Re: Upserts with Flink-elasticsearch

Zach Cox
You can just create a new UpdateRequest instance directly using its constructor [1] like this:

        return new UpdateRequest()
                .index(index)
                .type(type)
                .id(element)
                .source(json);



On Tue, Mar 29, 2016 at 8:28 AM HungChang <[hidden email]> wrote:
Hi Zach,

For using upsert in ES2, I guess it looks like as follows? However I cannot
find which method in Request returns UpdateRequest while
Requests.indexRequest() returns IndexRequest. Can I ask did you know it?

public static UpdateRequest updateIndexRequest(String element) {
        Map<String, Object> json = new HashMap<>();
        json.put("data", element);

       // Wrong call
        return Requests.updateRequest
                .index(index)
                .type(type)
                .id(element)
                .source(json);
    }

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Upserts-with-Flink-elasticsearch-tp5767p5787.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Upserts with Flink-elasticsearch

Aljoscha Krettek
@suneel if it is possible to add upsert support for ES 1.x I would go for it, yes.

On Tue, 29 Mar 2016 at 16:36 Zach Cox <[hidden email]> wrote:
You can just create a new UpdateRequest instance directly using its constructor [1] like this:

        return new UpdateRequest()
                .index(index)
                .type(type)
                .id(element)
                .source(json);



On Tue, Mar 29, 2016 at 8:28 AM HungChang <[hidden email]> wrote:
Hi Zach,

For using upsert in ES2, I guess it looks like as follows? However I cannot
find which method in Request returns UpdateRequest while
Requests.indexRequest() returns IndexRequest. Can I ask did you know it?

public static UpdateRequest updateIndexRequest(String element) {
        Map<String, Object> json = new HashMap<>();
        json.put("data", element);

       // Wrong call
        return Requests.updateRequest
                .index(index)
                .type(type)
                .id(element)
                .source(json);
    }

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Upserts-with-Flink-elasticsearch-tp5767p5787.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Upserts with Flink-elasticsearch

Hung
This post was updated on .
In reply to this post by Zach Cox
Without indexRequest ES2 throws `document does not exist` exception.

Based on  
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-update.html#java-docs-update-api-upsert 

the upsert works althrough not sure it's the best way.

return new UpdateRequest()
                .index(sample_index)
                .type(sample_type)
                .id(String.valueOf(id))
                .doc(json)
                .upsert(Requests.indexRequest()
                .index(sample_index)
                .type(sample_type)
                .id(String.valueOf(id))
                .source(json));

Cheers,

Hung