Hello, I'm using the Table API to do a bunch of stateful transformations on CDC Debezium rows and then insert final documents into Elasticsearch via the ES connector. I've noticed that Elasticsearch is constantly deleting and then inserting documents as they update. Ideally, there would be no delete operation for a row update, only for a delete. I'm using the Elasticsearch 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood, which implies upserts are actually what it's capable of. Therefore, I think it's possibly my table plan that's causing row upserts to turn into deletes + inserts. My plan is essentially a series of Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think, possibly the UDF Aggs following the Joins + GroupBys are causing the upserts to split into delete + inserts somehow. If this is correct, is it possible to make UDFs that preserve Upserts? Or am I totally off-base with my assumptions? Thanks! -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Also, just to be clear our ES connector looks like this: CREATE TABLE sink_es_groups ( id BIGINT, //.. a bunch of scalar fields array_of_ids ARRAY<BIGINT NOT NULL>, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = '${env:ELASTICSEARCH_HOSTS}', 'index' = '${env:GROUPS_ES_INDEX}', 'format' = 'json', 'sink.bulk-flush.max-actions' = '512', 'sink.bulk-flush.max-size' = '1mb', 'sink.bulk-flush.interval' = '5000', 'sink.bulk-flush.backoff.delay' = '1000', 'sink.bulk-flush.backoff.max-retries' = '4', 'sink.bulk-flush.backoff.strategy' = 'CONSTANT' ) On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi Rex, There is a similar question asked recently which I think is the same reason [1] called retraction amplification. You can try to turn on the mini-batch optimization to reduce the retraction amplification. Best, Jark On Fri, 6 Nov 2020 at 03:56, Rex Fenley <[hidden email]> wrote:
|
Thanks! We did give that a shot and ran into the bug that I reported here https://issues.apache.org/jira/browse/FLINK-20036 . I'm also seeing this function and it says it's more performant in some cases vs . I'm having some trouble understanding in which cases it benefits performance and if it would help our case. Would using `emitUpdateWithRetract` instead of `emitValue` reduce the number of retracts we're seeing yet preserve the same end results, where our Elasticsearch documents stay up to date?On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi, Does this seem like it would help? Thanks! On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi Rex, Sorry for the late response. Under the hood, if the UDTAF only implements `emitValue`, then the framework will call `emitValue` for every input record. Assuming this is a TopN UDTAF, the implementation of `emitValue` returns set [A, B, C] for input1 and returns set [A, B, D] for input2, then the framework will send -A, -B, -C, +A, +B, +D after processing input2. But if the TopN UDTAF implements the `emitUpdateWithRetract`, the UDTAF can just send -C and +D in `emitUpdateWithRetract`, because the TopN known which row is updated. So it can "reduce the number of retracts". Best, Jark On Wed, 18 Nov 2020 at 14:32, Rex Fenley <[hidden email]> wrote:
|
Wow, that sounds definitively better. I'll try porting our aggregates over to using `emitUpdateWithRetract` then. I'm assuming the Elasticsearch SQL connector will respond appropriately. Thanks for the help! On Wed, Nov 18, 2020 at 7:20 AM Jark Wu <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Free forum by Nabble | Edit this page |