Elasticsearch6UpsertTableSink how to trigger es delete index。

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Elasticsearch6UpsertTableSink how to trigger es delete index。

ouywl

Hi,
    When I use Elasticsearch6UpsertTableSink, and It seems implements delete index. Like code:
             @Override
public void process(Tuple2<Boolean, Row> element, RuntimeContext ctx, RequestIndexer indexer) {
if (element.f0) {
processUpsert(element.f1, indexer);
} else {
processDelete(element.f1, indexer);
}
}

Reply | Threaded
Open this post in threaded view
|

Re: Elasticsearch6UpsertTableSink how to trigger es delete index。

Piotr Nowojski-3
Hi, 

Take a look at the documentation. This [1] describes an example were a running query can produce updated results (and thus retracting the previous results).

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion

Piotrek

On 16 Oct 2019, at 09:25, ouywl <[hidden email]> wrote:


Hi,
    When I use Elasticsearch6UpsertTableSink, and It seems implements delete index. Like code:
             @Override
public void process(Tuple2<Boolean, Row> element, RuntimeContext ctx, RequestIndexer indexer) {
if (element.f0) {
processUpsert(element.f1, indexer);
} else {
processDelete(element.f1, indexer);
}
}