Why use ListView?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Why use ListView?

Rex Fenley
Hello,

In the recent version of Flink docs I read the following [1]:
> If an accumulator needs to store large amounts of data, org.apache.flink.table.api.dataview.ListView and org.apache.flink.table.api.dataview.MapView provide advanced features for leveraging Flink’s state backends in unbounded data scenarios. Please see the docs of the corresponding classes for more information about this advanced feature.

Our job has unbounded state from Debezium/Kafka, uses RocksDB, and we have a number of Aggregators like the following, which group a set of ids by some foreign key "group_id". The sets are usually 10-100 ids in size, but at the largest the sets could at some theoretical point get into the tens of thousands of ids (right now largest sets are ~2000 ids).

table.groupBy($"group_id")
.aggregate(
new IDsAgg()(
$"member_id"
) as ("member_ids")
)
.select($"group_id", $"member_ids")

case class IDsAcc(
var IDs: mutable.Set[Long]
)
class IDsAgg extends AggregateFunction[Row, IDsAcc] {

override def createAccumulator(): IDsAcc =
IDsAcc(mutable.Set())

def accumulate(
acc: IDsAcc,
ID: Long
): Unit = {
acc.IDs.add(ID)
}

def retract(acc: IDsAcc, ID: Long): Unit = {
acc.IDs.remove(ID)
}

def resetAccumulator(acc: IDsAcc): Unit = {
acc.IDs = mutable.Set()
}

override def getValue(acc: IDsAcc): Row = {
Row.of(acc.IDs.toArray)
}

override def getResultType: TypeInformation[Row] = {
new RowTypeInfo(
createTypeInformation[Array[Long]]
)
}
}

I read the docs [2] but I don't see it really say anything about why ListView is better than just using a Set or Array.
If we were to move from a Set to a ListView what advantages might we see in these Aggregates?

I also noticed that ListView existed in 1.11 (we're on 1.11.2), did we simply miss this feature? Does it work for 1.11.x too?

Thanks!


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Why use ListView?

Timo Walther
Hi Rex,

ListView and MapView have been part of Flink for years. However, they
were considered as an internal feature and therefore not well
documented. MapView is used internally to make distinct aggregates work.

Because we reworked the type inference of aggregate functions, we also
added basic documentation for power users.

By default an accumulator will be deserialized from state on every
access. ListView and MapView are not deserialized entirely on access but
delegate directly to a state backend. Thus, only the key that is
accessed is deserialized. So if an accumulator stores a lot of data, it
might be beneficial to use the mentioned abstractions.

Regards,
Timo


On 16.01.21 20:09, Rex Fenley wrote:

> Hello,
>
> In the recent version of Flink docs I read the following [1]:
>  > If an accumulator needs to store large amounts of data,
> |org.apache.flink.table.api.dataview.ListView| and
> |org.apache.flink.table.api.dataview.MapView| provide advanced features
> for leveraging Flink’s state backends in unbounded data scenarios.
> Please see the docs of the corresponding classes for more information
> about this advanced feature.
>
> Our job has unbounded state from Debezium/Kafka, uses RocksDB, and we
> have a number of Aggregators like the following, which group a set of
> ids by some foreign key "group_id". The sets are usually 10-100 ids in
> size, but at the largest the sets could at some theoretical point get
> into the tens of thousands of ids (right now largest sets are ~2000 ids).
>
> table.groupBy($"group_id")
> .aggregate(
> newIDsAgg()(
> $"member_id"
> ) as ("member_ids")
> )
> .select($"group_id", $"member_ids")
>
> caseclassIDsAcc(
> varIDs: mutable.Set[Long]
> )
> classIDsAgg extendsAggregateFunction[Row, IDsAcc] {
>
> overridedefcreateAccumulator(): IDsAcc =
> IDsAcc(mutable.Set())
>
> defaccumulate(
> acc: IDsAcc,
> ID: Long
> ): Unit = {
> acc.IDs.add(ID)
> }
>
> defretract(acc: IDsAcc, ID: Long): Unit = {
> acc.IDs.remove(ID)
> }
>
> defresetAccumulator(acc: IDsAcc): Unit = {
> acc.IDs = mutable.Set()
> }
>
> overridedefgetValue(acc: IDsAcc): Row = {
> Row.of(acc.IDs.toArray)
> }
>
> overridedefgetResultType: TypeInformation[Row] = {
> newRowTypeInfo(
> createTypeInformation[Array[Long]]
> )
> }
> }
>
> I read the docs [2] but I don't see it really say anything about why
> ListView is better than just using a Set or Array.
> If we were to move from a Set to a ListView what advantages might we see
> in these Aggregates?
>
> I also noticed that ListView existed in 1.11 (we're on 1.11.2), did we
> simply miss this feature? Does it work for 1.11.x too?
>
> Thanks!
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html 
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>

Reply | Threaded
Open this post in threaded view
|

Re: Why use ListView?

Rex Fenley
Fascinating, do you have an estimate of what qualifies as a lot of data and therefore when this should be used?

Thanks

On Mon, Jan 18, 2021 at 12:35 AM Timo Walther <[hidden email]> wrote:
Hi Rex,

ListView and MapView have been part of Flink for years. However, they
were considered as an internal feature and therefore not well
documented. MapView is used internally to make distinct aggregates work.

Because we reworked the type inference of aggregate functions, we also
added basic documentation for power users.

By default an accumulator will be deserialized from state on every
access. ListView and MapView are not deserialized entirely on access but
delegate directly to a state backend. Thus, only the key that is
accessed is deserialized. So if an accumulator stores a lot of data, it
might be beneficial to use the mentioned abstractions.

Regards,
Timo


On 16.01.21 20:09, Rex Fenley wrote:
> Hello,
>
> In the recent version of Flink docs I read the following [1]:
>  > If an accumulator needs to store large amounts of data,
> |org.apache.flink.table.api.dataview.ListView| and
> |org.apache.flink.table.api.dataview.MapView| provide advanced features
> for leveraging Flink’s state backends in unbounded data scenarios.
> Please see the docs of the corresponding classes for more information
> about this advanced feature.
>
> Our job has unbounded state from Debezium/Kafka, uses RocksDB, and we
> have a number of Aggregators like the following, which group a set of
> ids by some foreign key "group_id". The sets are usually 10-100 ids in
> size, but at the largest the sets could at some theoretical point get
> into the tens of thousands of ids (right now largest sets are ~2000 ids).
>
> table.groupBy($"group_id")
> .aggregate(
> newIDsAgg()(
> $"member_id"
> ) as ("member_ids")
> )
> .select($"group_id", $"member_ids")
>
> caseclassIDsAcc(
> varIDs: mutable.Set[Long]
> )
> classIDsAgg extendsAggregateFunction[Row, IDsAcc] {
>
> overridedefcreateAccumulator(): IDsAcc =
> IDsAcc(mutable.Set())
>
> defaccumulate(
> acc: IDsAcc,
> ID: Long
> ): Unit = {
> acc.IDs.add(ID)
> }
>
> defretract(acc: IDsAcc, ID: Long): Unit = {
> acc.IDs.remove(ID)
> }
>
> defresetAccumulator(acc: IDsAcc): Unit = {
> acc.IDs = mutable.Set()
> }
>
> overridedefgetValue(acc: IDsAcc): Row = {
> Row.of(acc.IDs.toArray)
> }
>
> overridedefgetResultType: TypeInformation[Row] = {
> newRowTypeInfo(
> createTypeInformation[Array[Long]]
> )
> }
> }
>
> I read the docs [2] but I don't see it really say anything about why
> ListView is better than just using a Set or Array.
> If we were to move from a Set to a ListView what advantages might we see
> in these Aggregates?
>
> I also noticed that ListView existed in 1.11 (we're on 1.11.2), did we
> simply miss this feature? Does it work for 1.11.x too?
>
> Thanks!
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Why use ListView?

Timo Walther
As always, this depends on the use case ;-)

In general, you should not get a performance regression in using them.
But keep in mind that ListViews/MapViews cannot be backed by a state
backend in every operator, so sometimes they are represented as
List/Maps on heap.

Regards,
Timo


On 18.01.21 18:28, Rex Fenley wrote:

> Fascinating, do you have an estimate of what qualifies as a lot of data
> and therefore when this should be used?
>
> Thanks
>
> On Mon, Jan 18, 2021 at 12:35 AM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Rex,
>
>     ListView and MapView have been part of Flink for years. However, they
>     were considered as an internal feature and therefore not well
>     documented. MapView is used internally to make distinct aggregates work.
>
>     Because we reworked the type inference of aggregate functions, we also
>     added basic documentation for power users.
>
>     By default an accumulator will be deserialized from state on every
>     access. ListView and MapView are not deserialized entirely on access
>     but
>     delegate directly to a state backend. Thus, only the key that is
>     accessed is deserialized. So if an accumulator stores a lot of data, it
>     might be beneficial to use the mentioned abstractions.
>
>     Regards,
>     Timo
>
>
>     On 16.01.21 20:09, Rex Fenley wrote:
>      > Hello,
>      >
>      > In the recent version of Flink docs I read the following [1]:
>      >  > If an accumulator needs to store large amounts of data,
>      > |org.apache.flink.table.api.dataview.ListView| and
>      > |org.apache.flink.table.api.dataview.MapView| provide advanced
>     features
>      > for leveraging Flink’s state backends in unbounded data scenarios.
>      > Please see the docs of the corresponding classes for more
>     information
>      > about this advanced feature.
>      >
>      > Our job has unbounded state from Debezium/Kafka, uses RocksDB,
>     and we
>      > have a number of Aggregators like the following, which group a
>     set of
>      > ids by some foreign key "group_id". The sets are usually 10-100
>     ids in
>      > size, but at the largest the sets could at some theoretical point
>     get
>      > into the tens of thousands of ids (right now largest sets are
>     ~2000 ids).
>      >
>      > table.groupBy($"group_id")
>      > .aggregate(
>      > newIDsAgg()(
>      > $"member_id"
>      > ) as ("member_ids")
>      > )
>      > .select($"group_id", $"member_ids")
>      >
>      > caseclassIDsAcc(
>      > varIDs: mutable.Set[Long]
>      > )
>      > classIDsAgg extendsAggregateFunction[Row, IDsAcc] {
>      >
>      > overridedefcreateAccumulator(): IDsAcc =
>      > IDsAcc(mutable.Set())
>      >
>      > defaccumulate(
>      > acc: IDsAcc,
>      > ID: Long
>      > ): Unit = {
>      > acc.IDs.add(ID)
>      > }
>      >
>      > defretract(acc: IDsAcc, ID: Long): Unit = {
>      > acc.IDs.remove(ID)
>      > }
>      >
>      > defresetAccumulator(acc: IDsAcc): Unit = {
>      > acc.IDs = mutable.Set()
>      > }
>      >
>      > overridedefgetValue(acc: IDsAcc): Row = {
>      > Row.of(acc.IDs.toArray)
>      > }
>      >
>      > overridedefgetResultType: TypeInformation[Row] = {
>      > newRowTypeInfo(
>      > createTypeInformation[Array[Long]]
>      > )
>      > }
>      > }
>      >
>      > I read the docs [2] but I don't see it really say anything about why
>      > ListView is better than just using a Set or Array.
>      > If we were to move from a Set to a ListView what advantages might
>     we see
>      > in these Aggregates?
>      >
>      > I also noticed that ListView existed in 1.11 (we're on 1.11.2),
>     did we
>      > simply miss this feature? Does it work for 1.11.x too?
>      >
>      > Thanks!
>      >
>      > [1]
>      >
>     https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods>
>
>      >
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods>>
>      > [2]
>      >
>     https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html
>     <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html>
>
>      >
>     <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html
>     <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html>>
>      >
>      > --
>      >
>      > Rex Fenley|Software Engineer - Mobile and Backend
>      >
>      >
>      > Remind.com <https://www.remind.com/ <https://www.remind.com/>>|
>     BLOG <http://blog.remind.com/ <http://blog.remind.com/>> |
>      > FOLLOW US <https://twitter.com/remindhq
>     <https://twitter.com/remindhq>> | LIKE US
>      > <https://www.facebook.com/remindhq
>     <https://www.facebook.com/remindhq>>
>      >
>
>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>

Reply | Threaded
Open this post in threaded view
|

Re: Why use ListView?

Rex Fenley
Thanks!

On Tue, Jan 19, 2021 at 12:55 AM Timo Walther <[hidden email]> wrote:
As always, this depends on the use case ;-)

In general, you should not get a performance regression in using them.
But keep in mind that ListViews/MapViews cannot be backed by a state
backend in every operator, so sometimes they are represented as
List/Maps on heap.

Regards,
Timo


On 18.01.21 18:28, Rex Fenley wrote:
> Fascinating, do you have an estimate of what qualifies as a lot of data
> and therefore when this should be used?
>
> Thanks
>
> On Mon, Jan 18, 2021 at 12:35 AM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Rex,
>
>     ListView and MapView have been part of Flink for years. However, they
>     were considered as an internal feature and therefore not well
>     documented. MapView is used internally to make distinct aggregates work.
>
>     Because we reworked the type inference of aggregate functions, we also
>     added basic documentation for power users.
>
>     By default an accumulator will be deserialized from state on every
>     access. ListView and MapView are not deserialized entirely on access
>     but
>     delegate directly to a state backend. Thus, only the key that is
>     accessed is deserialized. So if an accumulator stores a lot of data, it
>     might be beneficial to use the mentioned abstractions.
>
>     Regards,
>     Timo
>
>
>     On 16.01.21 20:09, Rex Fenley wrote:
>      > Hello,
>      >
>      > In the recent version of Flink docs I read the following [1]:
>      >  > If an accumulator needs to store large amounts of data,
>      > |org.apache.flink.table.api.dataview.ListView| and
>      > |org.apache.flink.table.api.dataview.MapView| provide advanced
>     features
>      > for leveraging Flink’s state backends in unbounded data scenarios.
>      > Please see the docs of the corresponding classes for more
>     information
>      > about this advanced feature.
>      >
>      > Our job has unbounded state from Debezium/Kafka, uses RocksDB,
>     and we
>      > have a number of Aggregators like the following, which group a
>     set of
>      > ids by some foreign key "group_id". The sets are usually 10-100
>     ids in
>      > size, but at the largest the sets could at some theoretical point
>     get
>      > into the tens of thousands of ids (right now largest sets are
>     ~2000 ids).
>      >
>      > table.groupBy($"group_id")
>      > .aggregate(
>      > newIDsAgg()(
>      > $"member_id"
>      > ) as ("member_ids")
>      > )
>      > .select($"group_id", $"member_ids")
>      >
>      > caseclassIDsAcc(
>      > varIDs: mutable.Set[Long]
>      > )
>      > classIDsAgg extendsAggregateFunction[Row, IDsAcc] {
>      >
>      > overridedefcreateAccumulator(): IDsAcc =
>      > IDsAcc(mutable.Set())
>      >
>      > defaccumulate(
>      > acc: IDsAcc,
>      > ID: Long
>      > ): Unit = {
>      > acc.IDs.add(ID)
>      > }
>      >
>      > defretract(acc: IDsAcc, ID: Long): Unit = {
>      > acc.IDs.remove(ID)
>      > }
>      >
>      > defresetAccumulator(acc: IDsAcc): Unit = {
>      > acc.IDs = mutable.Set()
>      > }
>      >
>      > overridedefgetValue(acc: IDsAcc): Row = {
>      > Row.of(acc.IDs.toArray)
>      > }
>      >
>      > overridedefgetResultType: TypeInformation[Row] = {
>      > newRowTypeInfo(
>      > createTypeInformation[Array[Long]]
>      > )
>      > }
>      > }
>      >
>      > I read the docs [2] but I don't see it really say anything about why
>      > ListView is better than just using a Set or Array.
>      > If we were to move from a Set to a ListView what advantages might
>     we see
>      > in these Aggregates?
>      >
>      > I also noticed that ListView existed in 1.11 (we're on 1.11.2),
>     did we
>      > simply miss this feature? Does it work for 1.11.x too?
>      >
>      > Thanks!
>      >
>      > [1]
>      >
>     https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods>
>
>      >
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods>>
>      > [2]
>      >
>     https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html
>     <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html>
>
>      >
>     <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html
>     <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html>>
>      >
>      > --
>      >
>      > Rex Fenley|Software Engineer - Mobile and Backend
>      >
>      >
>      > Remind.com <https://www.remind.com/ <https://www.remind.com/>>|
>     BLOG <http://blog.remind.com/ <http://blog.remind.com/>> |
>      > FOLLOW US <https://twitter.com/remindhq
>     <https://twitter.com/remindhq>> | LIKE US
>      > <https://www.facebook.com/remindhq
>     <https://www.facebook.com/remindhq>>
>      >
>
>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US