How to report metric based on keyed state piece

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to report metric based on keyed state piece

Salva Alcántara
I wonder what is the canonical way to accomplish the following:

Given a Flink UDF, how to report a metric `y` which is a function of some
(keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
the size of the state X.

For instance, consider a `CoFlatMap` function with:

- `X` being a `MapState`
- `y` (the metric) consisting of the aggregated size (i.e., the total size
of the `MapState`, for all keys)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to report metric based on keyed state piece

Kezhu Wang
With an initial `y`, I think you could compute new `y` on new stream value. Upon recovering from checkpoint, may be `KeyedStateBackend.applyToAllKeys` could help you to rebuild an initial `y`.

Best,
Kezhu Wang

On February 17, 2021 at 13:09:39, Salva Alcántara ([hidden email]) wrote:

I wonder what is the canonical way to accomplish the following:

Given a Flink UDF, how to report a metric `y` which is a function of some
(keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
the size of the state X.

For instance, consider a `CoFlatMap` function with:

- `X` being a `MapState`
- `y` (the metric) consisting of the aggregated size (i.e., the total size
of the `MapState`, for all keys)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to report metric based on keyed state piece

Piotr Nowojski-4
Hi Salva,

I'm not sure, but I think you can not access the state (especially the keyed state) from within the metric, as metrics are being evaluated outside of the keyed context, and also from another thread. Also things like `ValueState`/`MapState` are not exposing any size.

So probably you would have to follow Kezhu's suggestion. Whenever you are updating your state value, you can also update a shared variable to track the combined size (`AtomicLong`?). Upon recovery you would need to reinitialize it (maybe indeed `KeyedStateBackend.applyToAllKeys`).

Piotrek



śr., 17 lut 2021 o 14:13 Kezhu Wang <[hidden email]> napisał(a):
With an initial `y`, I think you could compute new `y` on new stream value. Upon recovering from checkpoint, may be `KeyedStateBackend.applyToAllKeys` could help you to rebuild an initial `y`.

Best,
Kezhu Wang

On February 17, 2021 at 13:09:39, Salva Alcántara ([hidden email]) wrote:

I wonder what is the canonical way to accomplish the following:

Given a Flink UDF, how to report a metric `y` which is a function of some
(keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
the size of the state X.

For instance, consider a `CoFlatMap` function with:

- `X` being a `MapState`
- `y` (the metric) consisting of the aggregated size (i.e., the total size
of the `MapState`, for all keys)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to report metric based on keyed state piece

Salva Alcántara
In reply to this post by Kezhu Wang
Many thanks Kezhu for pointing me on that direction!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to report metric based on keyed state piece

Salva Alcántara
In reply to this post by Piotr Nowojski-4