Hello, In the recent version of Flink docs I read the following [1]: > If an accumulator needs to store large amounts of data, org.apache.flink.table.api.dataview.ListView
and org.apache.flink.table.api.dataview.MapView provide advanced features for leveraging Flink’s state
backends in unbounded data scenarios. Please see the docs of the corresponding classes for more information
about this advanced feature.Our job has unbounded state from Debezium/Kafka, uses RocksDB, and we have a number of Aggregators like the following, which group a set of ids by some foreign key "group_id". The sets are usually 10-100 ids in size, but at the largest the sets could at some theoretical point get into the tens of thousands of ids (right now largest sets are ~2000 ids). table.groupBy($"group_id") .aggregate( new IDsAgg()( $"member_id" ) as ("member_ids") ) .select($"group_id", $"member_ids") case class IDsAcc( var IDs: mutable.Set[Long] ) class IDsAgg extends AggregateFunction[Row, IDsAcc] { override def createAccumulator(): IDsAcc = IDsAcc(mutable.Set()) def accumulate( acc: IDsAcc, ID: Long ): Unit = { acc.IDs.add(ID) } def retract(acc: IDsAcc, ID: Long): Unit = { acc.IDs.remove(ID) } def resetAccumulator(acc: IDsAcc): Unit = { acc.IDs = mutable.Set() } override def getValue(acc: IDsAcc): Row = { Row.of(acc.IDs.toArray) } override def getResultType: TypeInformation[Row] = { new RowTypeInfo( createTypeInformation[Array[Long]] ) } } I read the docs [2] but I don't see it really say anything about why ListView is better than just using a Set or Array. If we were to move from a Set to a ListView what advantages might we see in these Aggregates? I also noticed that ListView existed in 1.11 (we're on 1.11.2), did we simply miss this feature? Does it work for 1.11.x too? Thanks! Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi Rex,
ListView and MapView have been part of Flink for years. However, they were considered as an internal feature and therefore not well documented. MapView is used internally to make distinct aggregates work. Because we reworked the type inference of aggregate functions, we also added basic documentation for power users. By default an accumulator will be deserialized from state on every access. ListView and MapView are not deserialized entirely on access but delegate directly to a state backend. Thus, only the key that is accessed is deserialized. So if an accumulator stores a lot of data, it might be beneficial to use the mentioned abstractions. Regards, Timo On 16.01.21 20:09, Rex Fenley wrote: > Hello, > > In the recent version of Flink docs I read the following [1]: > > If an accumulator needs to store large amounts of data, > |org.apache.flink.table.api.dataview.ListView| and > |org.apache.flink.table.api.dataview.MapView| provide advanced features > for leveraging Flink’s state backends in unbounded data scenarios. > Please see the docs of the corresponding classes for more information > about this advanced feature. > > Our job has unbounded state from Debezium/Kafka, uses RocksDB, and we > have a number of Aggregators like the following, which group a set of > ids by some foreign key "group_id". The sets are usually 10-100 ids in > size, but at the largest the sets could at some theoretical point get > into the tens of thousands of ids (right now largest sets are ~2000 ids). > > table.groupBy($"group_id") > .aggregate( > newIDsAgg()( > $"member_id" > ) as ("member_ids") > ) > .select($"group_id", $"member_ids") > > caseclassIDsAcc( > varIDs: mutable.Set[Long] > ) > classIDsAgg extendsAggregateFunction[Row, IDsAcc] { > > overridedefcreateAccumulator(): IDsAcc = > IDsAcc(mutable.Set()) > > defaccumulate( > acc: IDsAcc, > ID: Long > ): Unit = { > acc.IDs.add(ID) > } > > defretract(acc: IDsAcc, ID: Long): Unit = { > acc.IDs.remove(ID) > } > > defresetAccumulator(acc: IDsAcc): Unit = { > acc.IDs = mutable.Set() > } > > overridedefgetValue(acc: IDsAcc): Row = { > Row.of(acc.IDs.toArray) > } > > overridedefgetResultType: TypeInformation[Row] = { > newRowTypeInfo( > createTypeInformation[Array[Long]] > ) > } > } > > I read the docs [2] but I don't see it really say anything about why > ListView is better than just using a Set or Array. > If we were to move from a Set to a ListView what advantages might we see > in these Aggregates? > > I also noticed that ListView existed in 1.11 (we're on 1.11.2), did we > simply miss this feature? Does it work for 1.11.x too? > > Thanks! > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods> > [2] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html> > > -- > > Rex Fenley|Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > |
Fascinating, do you have an estimate of what qualifies as a lot of data and therefore when this should be used? Thanks On Mon, Jan 18, 2021 at 12:35 AM Timo Walther <[hidden email]> wrote: Hi Rex, -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
As always, this depends on the use case ;-)
In general, you should not get a performance regression in using them. But keep in mind that ListViews/MapViews cannot be backed by a state backend in every operator, so sometimes they are represented as List/Maps on heap. Regards, Timo On 18.01.21 18:28, Rex Fenley wrote: > Fascinating, do you have an estimate of what qualifies as a lot of data > and therefore when this should be used? > > Thanks > > On Mon, Jan 18, 2021 at 12:35 AM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Rex, > > ListView and MapView have been part of Flink for years. However, they > were considered as an internal feature and therefore not well > documented. MapView is used internally to make distinct aggregates work. > > Because we reworked the type inference of aggregate functions, we also > added basic documentation for power users. > > By default an accumulator will be deserialized from state on every > access. ListView and MapView are not deserialized entirely on access > but > delegate directly to a state backend. Thus, only the key that is > accessed is deserialized. So if an accumulator stores a lot of data, it > might be beneficial to use the mentioned abstractions. > > Regards, > Timo > > > On 16.01.21 20:09, Rex Fenley wrote: > > Hello, > > > > In the recent version of Flink docs I read the following [1]: > > > If an accumulator needs to store large amounts of data, > > |org.apache.flink.table.api.dataview.ListView| and > > |org.apache.flink.table.api.dataview.MapView| provide advanced > features > > for leveraging Flink’s state backends in unbounded data scenarios. > > Please see the docs of the corresponding classes for more > information > > about this advanced feature. > > > > Our job has unbounded state from Debezium/Kafka, uses RocksDB, > and we > > have a number of Aggregators like the following, which group a > set of > > ids by some foreign key "group_id". The sets are usually 10-100 > ids in > > size, but at the largest the sets could at some theoretical point > get > > into the tens of thousands of ids (right now largest sets are > ~2000 ids). > > > > table.groupBy($"group_id") > > .aggregate( > > newIDsAgg()( > > $"member_id" > > ) as ("member_ids") > > ) > > .select($"group_id", $"member_ids") > > > > caseclassIDsAcc( > > varIDs: mutable.Set[Long] > > ) > > classIDsAgg extendsAggregateFunction[Row, IDsAcc] { > > > > overridedefcreateAccumulator(): IDsAcc = > > IDsAcc(mutable.Set()) > > > > defaccumulate( > > acc: IDsAcc, > > ID: Long > > ): Unit = { > > acc.IDs.add(ID) > > } > > > > defretract(acc: IDsAcc, ID: Long): Unit = { > > acc.IDs.remove(ID) > > } > > > > defresetAccumulator(acc: IDsAcc): Unit = { > > acc.IDs = mutable.Set() > > } > > > > overridedefgetValue(acc: IDsAcc): Row = { > > Row.of(acc.IDs.toArray) > > } > > > > overridedefgetResultType: TypeInformation[Row] = { > > newRowTypeInfo( > > createTypeInformation[Array[Long]] > > ) > > } > > } > > > > I read the docs [2] but I don't see it really say anything about why > > ListView is better than just using a Set or Array. > > If we were to move from a Set to a ListView what advantages might > we see > > in these Aggregates? > > > > I also noticed that ListView existed in 1.11 (we're on 1.11.2), > did we > > simply miss this feature? Does it work for 1.11.x too? > > > > Thanks! > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods> > > > > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods>> > > [2] > > > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html> > > > > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html>> > > > > -- > > > > Rex Fenley|Software Engineer - Mobile and Backend > > > > > > Remind.com <https://www.remind.com/ <https://www.remind.com/>>| > BLOG <http://blog.remind.com/ <http://blog.remind.com/>> | > > FOLLOW US <https://twitter.com/remindhq > <https://twitter.com/remindhq>> | LIKE US > > <https://www.facebook.com/remindhq > <https://www.facebook.com/remindhq>> > > > > > > -- > > Rex Fenley|Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > |
Thanks! On Tue, Jan 19, 2021 at 12:55 AM Timo Walther <[hidden email]> wrote: As always, this depends on the use case ;-) -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Free forum by Nabble | Edit this page |