Upsert UDFs

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Upsert UDFs

Rex Fenley
Hello,

I'm using the Table API to do a bunch of stateful transformations on CDC Debezium rows and then insert final documents into Elasticsearch via the ES connector.

I've noticed that Elasticsearch is constantly deleting and then inserting documents as they update. Ideally, there would be no delete operation for a row update, only for a delete. I'm using the Elasticsearch 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood, which implies upserts are actually what it's capable of.

Therefore, I think it's possibly my table plan that's causing row upserts to turn into deletes + inserts. My plan is essentially a series of Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think, possibly the UDF Aggs following the Joins + GroupBys are causing the upserts to split into delete + inserts somehow. If this is correct, is it possible to make UDFs that preserve Upserts? Or am I totally off-base with my assumptions?

Thanks!
--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Upsert UDFs

Rex Fenley
Also, just to be clear our ES connector looks like this:

CREATE TABLE sink_es_groups (
id BIGINT,
//.. a bunch of scalar fields
array_of_ids ARRAY<BIGINT NOT NULL>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '${env:ELASTICSEARCH_HOSTS}',
'index' = '${env:GROUPS_ES_INDEX}',
'format' = 'json',
'sink.bulk-flush.max-actions' = '512',
'sink.bulk-flush.max-size' = '1mb',
'sink.bulk-flush.interval' = '5000',
'sink.bulk-flush.backoff.delay' = '1000',
'sink.bulk-flush.backoff.max-retries' = '4',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)


On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <[hidden email]> wrote:
Hello,

I'm using the Table API to do a bunch of stateful transformations on CDC Debezium rows and then insert final documents into Elasticsearch via the ES connector.

I've noticed that Elasticsearch is constantly deleting and then inserting documents as they update. Ideally, there would be no delete operation for a row update, only for a delete. I'm using the Elasticsearch 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood, which implies upserts are actually what it's capable of.

Therefore, I think it's possibly my table plan that's causing row upserts to turn into deletes + inserts. My plan is essentially a series of Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think, possibly the UDF Aggs following the Joins + GroupBys are causing the upserts to split into delete + inserts somehow. If this is correct, is it possible to make UDFs that preserve Upserts? Or am I totally off-base with my assumptions?

Thanks!
--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Upsert UDFs

Jark Wu-3
Hi Rex,

There is a similar question asked recently which I think is the same reason [1] called retraction amplification. 
You can try to turn on the mini-batch optimization to reduce the retraction amplification.

Best,
Jark


On Fri, 6 Nov 2020 at 03:56, Rex Fenley <[hidden email]> wrote:
Also, just to be clear our ES connector looks like this:

CREATE TABLE sink_es_groups (
id BIGINT,
//.. a bunch of scalar fields
array_of_ids ARRAY<BIGINT NOT NULL>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '${env:ELASTICSEARCH_HOSTS}',
'index' = '${env:GROUPS_ES_INDEX}',
'format' = 'json',
'sink.bulk-flush.max-actions' = '512',
'sink.bulk-flush.max-size' = '1mb',
'sink.bulk-flush.interval' = '5000',
'sink.bulk-flush.backoff.delay' = '1000',
'sink.bulk-flush.backoff.max-retries' = '4',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)


On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <[hidden email]> wrote:
Hello,

I'm using the Table API to do a bunch of stateful transformations on CDC Debezium rows and then insert final documents into Elasticsearch via the ES connector.

I've noticed that Elasticsearch is constantly deleting and then inserting documents as they update. Ideally, there would be no delete operation for a row update, only for a delete. I'm using the Elasticsearch 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood, which implies upserts are actually what it's capable of.

Therefore, I think it's possibly my table plan that's causing row upserts to turn into deletes + inserts. My plan is essentially a series of Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think, possibly the UDF Aggs following the Joins + GroupBys are causing the upserts to split into delete + inserts somehow. If this is correct, is it possible to make UDFs that preserve Upserts? Or am I totally off-base with my assumptions?

Thanks!
--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Upsert UDFs

Rex Fenley
Thanks! We did give that a shot and ran into the bug that I reported here https://issues.apache.org/jira/browse/FLINK-20036 .

I'm also seeing this function
  public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL
and it says it's more performant in some cases vs
  public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
. I'm having some trouble understanding in which cases it benefits performance and if it would help our case. Would using `emitUpdateWithRetract` instead of `emitValue` reduce the number of retracts we're seeing yet preserve the same end results, where our Elasticsearch documents stay up to date?

On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <[hidden email]> wrote:
Hi Rex,

There is a similar question asked recently which I think is the same reason [1] called retraction amplification. 
You can try to turn on the mini-batch optimization to reduce the retraction amplification.

Best,
Jark


On Fri, 6 Nov 2020 at 03:56, Rex Fenley <[hidden email]> wrote:
Also, just to be clear our ES connector looks like this:

CREATE TABLE sink_es_groups (
id BIGINT,
//.. a bunch of scalar fields
array_of_ids ARRAY<BIGINT NOT NULL>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '${env:ELASTICSEARCH_HOSTS}',
'index' = '${env:GROUPS_ES_INDEX}',
'format' = 'json',
'sink.bulk-flush.max-actions' = '512',
'sink.bulk-flush.max-size' = '1mb',
'sink.bulk-flush.interval' = '5000',
'sink.bulk-flush.backoff.delay' = '1000',
'sink.bulk-flush.backoff.max-retries' = '4',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)


On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <[hidden email]> wrote:
Hello,

I'm using the Table API to do a bunch of stateful transformations on CDC Debezium rows and then insert final documents into Elasticsearch via the ES connector.

I've noticed that Elasticsearch is constantly deleting and then inserting documents as they update. Ideally, there would be no delete operation for a row update, only for a delete. I'm using the Elasticsearch 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood, which implies upserts are actually what it's capable of.

Therefore, I think it's possibly my table plan that's causing row upserts to turn into deletes + inserts. My plan is essentially a series of Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think, possibly the UDF Aggs following the Joins + GroupBys are causing the upserts to split into delete + inserts somehow. If this is correct, is it possible to make UDFs that preserve Upserts? Or am I totally off-base with my assumptions?

Thanks!
--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Upsert UDFs

Rex Fenley
Hi,

Does this seem like it would help?

Thanks!

On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley <[hidden email]> wrote:
Thanks! We did give that a shot and ran into the bug that I reported here https://issues.apache.org/jira/browse/FLINK-20036 .

I'm also seeing this function
  public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL
and it says it's more performant in some cases vs
  public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
. I'm having some trouble understanding in which cases it benefits performance and if it would help our case. Would using `emitUpdateWithRetract` instead of `emitValue` reduce the number of retracts we're seeing yet preserve the same end results, where our Elasticsearch documents stay up to date?

On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <[hidden email]> wrote:
Hi Rex,

There is a similar question asked recently which I think is the same reason [1] called retraction amplification. 
You can try to turn on the mini-batch optimization to reduce the retraction amplification.

Best,
Jark


On Fri, 6 Nov 2020 at 03:56, Rex Fenley <[hidden email]> wrote:
Also, just to be clear our ES connector looks like this:

CREATE TABLE sink_es_groups (
id BIGINT,
//.. a bunch of scalar fields
array_of_ids ARRAY<BIGINT NOT NULL>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '${env:ELASTICSEARCH_HOSTS}',
'index' = '${env:GROUPS_ES_INDEX}',
'format' = 'json',
'sink.bulk-flush.max-actions' = '512',
'sink.bulk-flush.max-size' = '1mb',
'sink.bulk-flush.interval' = '5000',
'sink.bulk-flush.backoff.delay' = '1000',
'sink.bulk-flush.backoff.max-retries' = '4',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)


On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <[hidden email]> wrote:
Hello,

I'm using the Table API to do a bunch of stateful transformations on CDC Debezium rows and then insert final documents into Elasticsearch via the ES connector.

I've noticed that Elasticsearch is constantly deleting and then inserting documents as they update. Ideally, there would be no delete operation for a row update, only for a delete. I'm using the Elasticsearch 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood, which implies upserts are actually what it's capable of.

Therefore, I think it's possibly my table plan that's causing row upserts to turn into deletes + inserts. My plan is essentially a series of Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think, possibly the UDF Aggs following the Joins + GroupBys are causing the upserts to split into delete + inserts somehow. If this is correct, is it possible to make UDFs that preserve Upserts? Or am I totally off-base with my assumptions?

Thanks!
--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Upsert UDFs

Jark Wu-3
Hi Rex,

Sorry for the late response.

Under the hood, if the UDTAF only implements `emitValue`, then the framework will call `emitValue` for every input record. Assuming this is a TopN UDTAF, the implementation of `emitValue` returns set [A, B, C] for input1 
and returns set [A, B, D] for input2, then the framework will send -A, -B, -C, +A, +B, +D after processing input2.

But if the TopN UDTAF implements the `emitUpdateWithRetract`, the UDTAF can just send -C and +D in `emitUpdateWithRetract`,
because the TopN known which row is updated. So it can "reduce the number of retracts".

Best,
Jark

On Wed, 18 Nov 2020 at 14:32, Rex Fenley <[hidden email]> wrote:
Hi,

Does this seem like it would help?

Thanks!

On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley <[hidden email]> wrote:
Thanks! We did give that a shot and ran into the bug that I reported here https://issues.apache.org/jira/browse/FLINK-20036 .

I'm also seeing this function
  public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL
and it says it's more performant in some cases vs
  public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
. I'm having some trouble understanding in which cases it benefits performance and if it would help our case. Would using `emitUpdateWithRetract` instead of `emitValue` reduce the number of retracts we're seeing yet preserve the same end results, where our Elasticsearch documents stay up to date?

On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <[hidden email]> wrote:
Hi Rex,

There is a similar question asked recently which I think is the same reason [1] called retraction amplification. 
You can try to turn on the mini-batch optimization to reduce the retraction amplification.

Best,
Jark


On Fri, 6 Nov 2020 at 03:56, Rex Fenley <[hidden email]> wrote:
Also, just to be clear our ES connector looks like this:

CREATE TABLE sink_es_groups (
id BIGINT,
//.. a bunch of scalar fields
array_of_ids ARRAY<BIGINT NOT NULL>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '${env:ELASTICSEARCH_HOSTS}',
'index' = '${env:GROUPS_ES_INDEX}',
'format' = 'json',
'sink.bulk-flush.max-actions' = '512',
'sink.bulk-flush.max-size' = '1mb',
'sink.bulk-flush.interval' = '5000',
'sink.bulk-flush.backoff.delay' = '1000',
'sink.bulk-flush.backoff.max-retries' = '4',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)


On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <[hidden email]> wrote:
Hello,

I'm using the Table API to do a bunch of stateful transformations on CDC Debezium rows and then insert final documents into Elasticsearch via the ES connector.

I've noticed that Elasticsearch is constantly deleting and then inserting documents as they update. Ideally, there would be no delete operation for a row update, only for a delete. I'm using the Elasticsearch 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood, which implies upserts are actually what it's capable of.

Therefore, I think it's possibly my table plan that's causing row upserts to turn into deletes + inserts. My plan is essentially a series of Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think, possibly the UDF Aggs following the Joins + GroupBys are causing the upserts to split into delete + inserts somehow. If this is correct, is it possible to make UDFs that preserve Upserts? Or am I totally off-base with my assumptions?

Thanks!
--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Upsert UDFs

Rex Fenley
Wow, that sounds definitively better. I'll try porting our aggregates over to using `emitUpdateWithRetract` then. I'm assuming the Elasticsearch SQL connector will respond appropriately.

Thanks for the help!

On Wed, Nov 18, 2020 at 7:20 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Sorry for the late response.

Under the hood, if the UDTAF only implements `emitValue`, then the framework will call `emitValue` for every input record. Assuming this is a TopN UDTAF, the implementation of `emitValue` returns set [A, B, C] for input1 
and returns set [A, B, D] for input2, then the framework will send -A, -B, -C, +A, +B, +D after processing input2.

But if the TopN UDTAF implements the `emitUpdateWithRetract`, the UDTAF can just send -C and +D in `emitUpdateWithRetract`,
because the TopN known which row is updated. So it can "reduce the number of retracts".

Best,
Jark

On Wed, 18 Nov 2020 at 14:32, Rex Fenley <[hidden email]> wrote:
Hi,

Does this seem like it would help?

Thanks!

On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley <[hidden email]> wrote:
Thanks! We did give that a shot and ran into the bug that I reported here https://issues.apache.org/jira/browse/FLINK-20036 .

I'm also seeing this function
  public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL
and it says it's more performant in some cases vs
  public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
. I'm having some trouble understanding in which cases it benefits performance and if it would help our case. Would using `emitUpdateWithRetract` instead of `emitValue` reduce the number of retracts we're seeing yet preserve the same end results, where our Elasticsearch documents stay up to date?

On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <[hidden email]> wrote:
Hi Rex,

There is a similar question asked recently which I think is the same reason [1] called retraction amplification. 
You can try to turn on the mini-batch optimization to reduce the retraction amplification.

Best,
Jark


On Fri, 6 Nov 2020 at 03:56, Rex Fenley <[hidden email]> wrote:
Also, just to be clear our ES connector looks like this:

CREATE TABLE sink_es_groups (
id BIGINT,
//.. a bunch of scalar fields
array_of_ids ARRAY<BIGINT NOT NULL>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '${env:ELASTICSEARCH_HOSTS}',
'index' = '${env:GROUPS_ES_INDEX}',
'format' = 'json',
'sink.bulk-flush.max-actions' = '512',
'sink.bulk-flush.max-size' = '1mb',
'sink.bulk-flush.interval' = '5000',
'sink.bulk-flush.backoff.delay' = '1000',
'sink.bulk-flush.backoff.max-retries' = '4',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)


On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <[hidden email]> wrote:
Hello,

I'm using the Table API to do a bunch of stateful transformations on CDC Debezium rows and then insert final documents into Elasticsearch via the ES connector.

I've noticed that Elasticsearch is constantly deleting and then inserting documents as they update. Ideally, there would be no delete operation for a row update, only for a delete. I'm using the Elasticsearch 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood, which implies upserts are actually what it's capable of.

Therefore, I think it's possibly my table plan that's causing row upserts to turn into deletes + inserts. My plan is essentially a series of Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think, possibly the UDF Aggs following the Joins + GroupBys are causing the upserts to split into delete + inserts somehow. If this is correct, is it possible to make UDFs that preserve Upserts? Or am I totally off-base with my assumptions?

Thanks!
--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US