How User-Defined AggregateFunctions handle deletes of all aggregated rows.

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

How User-Defined AggregateFunctions handle deletes of all aggregated rows.

Rex Fenley
Hello,

I'd like to better understand delete behavior of AggregateFunctions. Let's assume there's an aggregate of `user_id` to a set of `group_ids` for groups belonging to that user.
`user_id_1 -> [group_id_1, group_id_2, etc.]`
Now let's assume sometime later that deletes arrive for all rows which produce user_id_1's group_id's.

Would the aggregate function completely delete the associated state from RocksDB or would it leave something like `user_id_1 -> []` sitting in RocksDB forever?

We have an aggregate similar to this where users could delete themselves and we want to make sure we're not accumulating data forever for those users.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

Danny Chan-2
Hi, Rex Fenley ~

If there is stateful operator as the output of the aggregate function. Then each time the function receives an update (or delete) for the key, the agg operator would emit 2 messages, one for retracting the old record, one for the new message. For your case, the new message is the DELETE.

If there is no stateful operator, the aggregate operator would just emit the update after (the new) message which is the delete.

Rex Fenley <[hidden email]> 于2020年12月9日周三 上午4:30写道:
Hello,

I'd like to better understand delete behavior of AggregateFunctions. Let's assume there's an aggregate of `user_id` to a set of `group_ids` for groups belonging to that user.
`user_id_1 -> [group_id_1, group_id_2, etc.]`
Now let's assume sometime later that deletes arrive for all rows which produce user_id_1's group_id's.

Would the aggregate function completely delete the associated state from RocksDB or would it leave something like `user_id_1 -> []` sitting in RocksDB forever?

We have an aggregate similar to this where users could delete themselves and we want to make sure we're not accumulating data forever for those users.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

Rex Fenley
So from what I'm understanding, the aggregate itself is not a "stateful operator" but one may follow it? How does the aggregate accumulator keep old values then? It can't all just live in memory, actually, looking at the savepoints it looks like there's state associated with our aggregate operator.

To clarify my concern too, in my retract function impl in the aggregate function class, all I do is remove a value (a group id) from the accumulator set (which is an array). For example, if there is only 1 group_id left for a user and it gets deleted, that group_id will be removed from the accumulator set and the set will be empty. I would hope that at that point, given that there are no remaining rows for the aggregate, that I could or flink will just delete the associated stored accumulator altogether i.e. delete `user_id_1 -> []`. Is it possible that both the groups and the user need to be deleted for everything to clear from storage? That might make more sense actually..

If this doesn't happen, since users delete themselves and their groups all the time, we'll be storing all these empty data sets in rocks for no reason. To clarify, we're using Debezium as our source and using Flink as a materialization engine, so we never want to explicitly set a timeout on any of our data, we just want to scale up predictably with our user growth.

Thanks!

On Wed, Dec 9, 2020 at 4:14 AM Danny Chan <[hidden email]> wrote:
Hi, Rex Fenley ~

If there is stateful operator as the output of the aggregate function. Then each time the function receives an update (or delete) for the key, the agg operator would emit 2 messages, one for retracting the old record, one for the new message. For your case, the new message is the DELETE.

If there is no stateful operator, the aggregate operator would just emit the update after (the new) message which is the delete.

Rex Fenley <[hidden email]> 于2020年12月9日周三 上午4:30写道:
Hello,

I'd like to better understand delete behavior of AggregateFunctions. Let's assume there's an aggregate of `user_id` to a set of `group_ids` for groups belonging to that user.
`user_id_1 -> [group_id_1, group_id_2, etc.]`
Now let's assume sometime later that deletes arrive for all rows which produce user_id_1's group_id's.

Would the aggregate function completely delete the associated state from RocksDB or would it leave something like `user_id_1 -> []` sitting in RocksDB forever?

We have an aggregate similar to this where users could delete themselves and we want to make sure we're not accumulating data forever for those users.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

Danny Chan-2
No, the group agg, stream-stream join and rank are all stateful operators which need a state-backend to bookkeep the acc values.

But it is only required to emit the retractions when the stateful operator A has a downstream operator B that is also stateful, because the B needs the retractions to correct the accs. If B is not stateful, just emitting the new record to override is enough.

You just need to correct the acc state to what it expects to be (say re-evaluate the acc without the record that needs retraction) when you received  the retraction message.

Rex Fenley <[hidden email]> 于2020年12月10日周四 上午2:44写道:
So from what I'm understanding, the aggregate itself is not a "stateful operator" but one may follow it? How does the aggregate accumulator keep old values then? It can't all just live in memory, actually, looking at the savepoints it looks like there's state associated with our aggregate operator.

To clarify my concern too, in my retract function impl in the aggregate function class, all I do is remove a value (a group id) from the accumulator set (which is an array). For example, if there is only 1 group_id left for a user and it gets deleted, that group_id will be removed from the accumulator set and the set will be empty. I would hope that at that point, given that there are no remaining rows for the aggregate, that I could or flink will just delete the associated stored accumulator altogether i.e. delete `user_id_1 -> []`. Is it possible that both the groups and the user need to be deleted for everything to clear from storage? That might make more sense actually..

If this doesn't happen, since users delete themselves and their groups all the time, we'll be storing all these empty data sets in rocks for no reason. To clarify, we're using Debezium as our source and using Flink as a materialization engine, so we never want to explicitly set a timeout on any of our data, we just want to scale up predictably with our user growth.

Thanks!

On Wed, Dec 9, 2020 at 4:14 AM Danny Chan <[hidden email]> wrote:
Hi, Rex Fenley ~

If there is stateful operator as the output of the aggregate function. Then each time the function receives an update (or delete) for the key, the agg operator would emit 2 messages, one for retracting the old record, one for the new message. For your case, the new message is the DELETE.

If there is no stateful operator, the aggregate operator would just emit the update after (the new) message which is the delete.

Rex Fenley <[hidden email]> 于2020年12月9日周三 上午4:30写道:
Hello,

I'd like to better understand delete behavior of AggregateFunctions. Let's assume there's an aggregate of `user_id` to a set of `group_ids` for groups belonging to that user.
`user_id_1 -> [group_id_1, group_id_2, etc.]`
Now let's assume sometime later that deletes arrive for all rows which produce user_id_1's group_id's.

Would the aggregate function completely delete the associated state from RocksDB or would it leave something like `user_id_1 -> []` sitting in RocksDB forever?

We have an aggregate similar to this where users could delete themselves and we want to make sure we're not accumulating data forever for those users.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

Rex Fenley
Ok, that makes sense.

> You just need to correct the acc state to what it expects to be (say re-evaluate the acc without the record that needs retraction) when you received  the retraction message.

So for example, if i just remove all items from acc.groupIdSet on retraction it will know to clear out the state entirely from rocks?

If a user gets deleted altogether (and my groupby is on user_id) what sort of retraction do I need to evaluate then? Because I'm thinking now it will need to just delete the state entirely and pass a full retraction of the state downstream, but I don't know if deleting state from rocks happens automatically or I need to make it do that in the retract method somehow.

On Wed, Dec 9, 2020 at 6:16 PM Danny Chan <[hidden email]> wrote:
No, the group agg, stream-stream join and rank are all stateful operators which need a state-backend to bookkeep the acc values.

But it is only required to emit the retractions when the stateful operator A has a downstream operator B that is also stateful, because the B needs the retractions to correct the accs. If B is not stateful, just emitting the new record to override is enough.

You just need to correct the acc state to what it expects to be (say re-evaluate the acc without the record that needs retraction) when you received  the retraction message.

Rex Fenley <[hidden email]> 于2020年12月10日周四 上午2:44写道:
So from what I'm understanding, the aggregate itself is not a "stateful operator" but one may follow it? How does the aggregate accumulator keep old values then? It can't all just live in memory, actually, looking at the savepoints it looks like there's state associated with our aggregate operator.

To clarify my concern too, in my retract function impl in the aggregate function class, all I do is remove a value (a group id) from the accumulator set (which is an array). For example, if there is only 1 group_id left for a user and it gets deleted, that group_id will be removed from the accumulator set and the set will be empty. I would hope that at that point, given that there are no remaining rows for the aggregate, that I could or flink will just delete the associated stored accumulator altogether i.e. delete `user_id_1 -> []`. Is it possible that both the groups and the user need to be deleted for everything to clear from storage? That might make more sense actually..

If this doesn't happen, since users delete themselves and their groups all the time, we'll be storing all these empty data sets in rocks for no reason. To clarify, we're using Debezium as our source and using Flink as a materialization engine, so we never want to explicitly set a timeout on any of our data, we just want to scale up predictably with our user growth.

Thanks!

On Wed, Dec 9, 2020 at 4:14 AM Danny Chan <[hidden email]> wrote:
Hi, Rex Fenley ~

If there is stateful operator as the output of the aggregate function. Then each time the function receives an update (or delete) for the key, the agg operator would emit 2 messages, one for retracting the old record, one for the new message. For your case, the new message is the DELETE.

If there is no stateful operator, the aggregate operator would just emit the update after (the new) message which is the delete.

Rex Fenley <[hidden email]> 于2020年12月9日周三 上午4:30写道:
Hello,

I'd like to better understand delete behavior of AggregateFunctions. Let's assume there's an aggregate of `user_id` to a set of `group_ids` for groups belonging to that user.
`user_id_1 -> [group_id_1, group_id_2, etc.]`
Now let's assume sometime later that deletes arrive for all rows which produce user_id_1's group_id's.

Would the aggregate function completely delete the associated state from RocksDB or would it leave something like `user_id_1 -> []` sitting in RocksDB forever?

We have an aggregate similar to this where users could delete themselves and we want to make sure we're not accumulating data forever for those users.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

Rex Fenley
Hi,

Does this question make sense or am I missing something?

Thanks!

On Thu, Dec 10, 2020 at 10:24 AM Rex Fenley <[hidden email]> wrote:
Ok, that makes sense.

> You just need to correct the acc state to what it expects to be (say re-evaluate the acc without the record that needs retraction) when you received  the retraction message.

So for example, if i just remove all items from acc.groupIdSet on retraction it will know to clear out the state entirely from rocks?

If a user gets deleted altogether (and my groupby is on user_id) what sort of retraction do I need to evaluate then? Because I'm thinking now it will need to just delete the state entirely and pass a full retraction of the state downstream, but I don't know if deleting state from rocks happens automatically or I need to make it do that in the retract method somehow.

On Wed, Dec 9, 2020 at 6:16 PM Danny Chan <[hidden email]> wrote:
No, the group agg, stream-stream join and rank are all stateful operators which need a state-backend to bookkeep the acc values.

But it is only required to emit the retractions when the stateful operator A has a downstream operator B that is also stateful, because the B needs the retractions to correct the accs. If B is not stateful, just emitting the new record to override is enough.

You just need to correct the acc state to what it expects to be (say re-evaluate the acc without the record that needs retraction) when you received  the retraction message.

Rex Fenley <[hidden email]> 于2020年12月10日周四 上午2:44写道:
So from what I'm understanding, the aggregate itself is not a "stateful operator" but one may follow it? How does the aggregate accumulator keep old values then? It can't all just live in memory, actually, looking at the savepoints it looks like there's state associated with our aggregate operator.

To clarify my concern too, in my retract function impl in the aggregate function class, all I do is remove a value (a group id) from the accumulator set (which is an array). For example, if there is only 1 group_id left for a user and it gets deleted, that group_id will be removed from the accumulator set and the set will be empty. I would hope that at that point, given that there are no remaining rows for the aggregate, that I could or flink will just delete the associated stored accumulator altogether i.e. delete `user_id_1 -> []`. Is it possible that both the groups and the user need to be deleted for everything to clear from storage? That might make more sense actually..

If this doesn't happen, since users delete themselves and their groups all the time, we'll be storing all these empty data sets in rocks for no reason. To clarify, we're using Debezium as our source and using Flink as a materialization engine, so we never want to explicitly set a timeout on any of our data, we just want to scale up predictably with our user growth.

Thanks!

On Wed, Dec 9, 2020 at 4:14 AM Danny Chan <[hidden email]> wrote:
Hi, Rex Fenley ~

If there is stateful operator as the output of the aggregate function. Then each time the function receives an update (or delete) for the key, the agg operator would emit 2 messages, one for retracting the old record, one for the new message. For your case, the new message is the DELETE.

If there is no stateful operator, the aggregate operator would just emit the update after (the new) message which is the delete.

Rex Fenley <[hidden email]> 于2020年12月9日周三 上午4:30写道:
Hello,

I'd like to better understand delete behavior of AggregateFunctions. Let's assume there's an aggregate of `user_id` to a set of `group_ids` for groups belonging to that user.
`user_id_1 -> [group_id_1, group_id_2, etc.]`
Now let's assume sometime later that deletes arrive for all rows which produce user_id_1's group_id's.

Would the aggregate function completely delete the associated state from RocksDB or would it leave something like `user_id_1 -> []` sitting in RocksDB forever?

We have an aggregate similar to this where users could delete themselves and we want to make sure we're not accumulating data forever for those users.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

Rex Fenley
Hello,

I still don't have a good understanding of how UDAF in the Table API handles deletes. If every row aggregated into one groupBy(key) gets a retract, meaning nothing should be grouped by that key, will the state get deleted? Is there a way to delete the state for that row i.e. forward a retract but not an append and remove the state from RocksDB?

Thanks!

On Fri, Dec 11, 2020 at 9:15 AM Rex Fenley <[hidden email]> wrote:
Hi,

Does this question make sense or am I missing something?

Thanks!

On Thu, Dec 10, 2020 at 10:24 AM Rex Fenley <[hidden email]> wrote:
Ok, that makes sense.

> You just need to correct the acc state to what it expects to be (say re-evaluate the acc without the record that needs retraction) when you received  the retraction message.

So for example, if i just remove all items from acc.groupIdSet on retraction it will know to clear out the state entirely from rocks?

If a user gets deleted altogether (and my groupby is on user_id) what sort of retraction do I need to evaluate then? Because I'm thinking now it will need to just delete the state entirely and pass a full retraction of the state downstream, but I don't know if deleting state from rocks happens automatically or I need to make it do that in the retract method somehow.

On Wed, Dec 9, 2020 at 6:16 PM Danny Chan <[hidden email]> wrote:
No, the group agg, stream-stream join and rank are all stateful operators which need a state-backend to bookkeep the acc values.

But it is only required to emit the retractions when the stateful operator A has a downstream operator B that is also stateful, because the B needs the retractions to correct the accs. If B is not stateful, just emitting the new record to override is enough.

You just need to correct the acc state to what it expects to be (say re-evaluate the acc without the record that needs retraction) when you received  the retraction message.

Rex Fenley <[hidden email]> 于2020年12月10日周四 上午2:44写道:
So from what I'm understanding, the aggregate itself is not a "stateful operator" but one may follow it? How does the aggregate accumulator keep old values then? It can't all just live in memory, actually, looking at the savepoints it looks like there's state associated with our aggregate operator.

To clarify my concern too, in my retract function impl in the aggregate function class, all I do is remove a value (a group id) from the accumulator set (which is an array). For example, if there is only 1 group_id left for a user and it gets deleted, that group_id will be removed from the accumulator set and the set will be empty. I would hope that at that point, given that there are no remaining rows for the aggregate, that I could or flink will just delete the associated stored accumulator altogether i.e. delete `user_id_1 -> []`. Is it possible that both the groups and the user need to be deleted for everything to clear from storage? That might make more sense actually..

If this doesn't happen, since users delete themselves and their groups all the time, we'll be storing all these empty data sets in rocks for no reason. To clarify, we're using Debezium as our source and using Flink as a materialization engine, so we never want to explicitly set a timeout on any of our data, we just want to scale up predictably with our user growth.

Thanks!

On Wed, Dec 9, 2020 at 4:14 AM Danny Chan <[hidden email]> wrote:
Hi, Rex Fenley ~

If there is stateful operator as the output of the aggregate function. Then each time the function receives an update (or delete) for the key, the agg operator would emit 2 messages, one for retracting the old record, one for the new message. For your case, the new message is the DELETE.

If there is no stateful operator, the aggregate operator would just emit the update after (the new) message which is the delete.

Rex Fenley <[hidden email]> 于2020年12月9日周三 上午4:30写道:
Hello,

I'd like to better understand delete behavior of AggregateFunctions. Let's assume there's an aggregate of `user_id` to a set of `group_ids` for groups belonging to that user.
`user_id_1 -> [group_id_1, group_id_2, etc.]`
Now let's assume sometime later that deletes arrive for all rows which produce user_id_1's group_id's.

Would the aggregate function completely delete the associated state from RocksDB or would it leave something like `user_id_1 -> []` sitting in RocksDB forever?

We have an aggregate similar to this where users could delete themselves and we want to make sure we're not accumulating data forever for those users.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US