Per Operator State Monitoring

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Per Operator State Monitoring

Aaron Langford
Hey Flink Community,

I'm working on a Flink application where we are implementing operators that extend the RichFlatMap and RichCoFlatMap interfaces. As a result, we are working directly with Flink's state API (ValueState, ListState, MapState). Something that appears to be extremely valuable is having a way to monitor the state size for each operator. My team has already run into a few cases where our state has exploded and jobs fail because YARN kills containers who are exceeding their memory limits.

It is my understanding that the way to best monitor this kind of thing by watching checkpoint size per operator instance. This gets a little confusing when doing incremental check-pointing because the numbers reported seem to be a delta in state size, not the actual state size at that point in time. For my teams application, the total state size is not the sum of those deltas. What is the best way to get the total size of a checkpoint per operator for each checkpoint?

Additionally, monitoring de-serializing and serializing state in a Flink application is something that I haven't seen a great story for yet. It seems that some of the really badly written Flink operators tend to do most poorly when they demand lots of serde for each record. So giving visibility into how well an application is handling these types of operations seems to be a valuable guard rail for flink developers. Does anyone have existing solutions for this, or are there pointers to some work that can be done to improve this story?

Aaron
Reply | Threaded
Open this post in threaded view
|

Re: Per Operator State Monitoring

Piotr Nowojski-3
Hi,

I’m not sure if there is some simple way of doing that (maybe some other contributors will know more).

There are two potential ideas worth exploring:
- use periodically triggered save points for monitoring? If I remember correctly save points are never incremental
- use save point input/output format to analyse the content of the save point? [1]

I hope that someone else from the community will be able to help more here.

Piotrek

[1] https://flink.apache.org/feature/2019/09/13/state-processor-api.html

On 22 Nov 2019, at 22:48, Aaron Langford <[hidden email]> wrote:

Hey Flink Community,

I'm working on a Flink application where we are implementing operators that extend the RichFlatMap and RichCoFlatMap interfaces. As a result, we are working directly with Flink's state API (ValueState, ListState, MapState). Something that appears to be extremely valuable is having a way to monitor the state size for each operator. My team has already run into a few cases where our state has exploded and jobs fail because YARN kills containers who are exceeding their memory limits.

It is my understanding that the way to best monitor this kind of thing by watching checkpoint size per operator instance. This gets a little confusing when doing incremental check-pointing because the numbers reported seem to be a delta in state size, not the actual state size at that point in time. For my teams application, the total state size is not the sum of those deltas. What is the best way to get the total size of a checkpoint per operator for each checkpoint?

Additionally, monitoring de-serializing and serializing state in a Flink application is something that I haven't seen a great story for yet. It seems that some of the really badly written Flink operators tend to do most poorly when they demand lots of serde for each record. So giving visibility into how well an application is handling these types of operations seems to be a valuable guard rail for flink developers. Does anyone have existing solutions for this, or are there pointers to some work that can be done to improve this story?

Aaron

Reply | Threaded
Open this post in threaded view
|

Re: Per Operator State Monitoring

Yu Li
Hi Aaron,

I don't think we have such fine grained metrics on per operation state size, but from your description that "YARN kills containers who are exceeding their memory limits", I think the root cause is not the state size but related to the memory consumption of the state backend.

My guess is you are using RocksDB state backend because with heap backend you won't exceed the Xmx limit of the JVM and could hardly get killed by Yarn (unless you're spawning huge amount of threads in your operator logic, in which case it has nothing to do with state). And if I'm correct, could you carefully check your memory settings to make sure it could cover all memory usage of RocksDB [1]? You may also find some good solution in FLINK-7289 [2] to prevent RocksDB memory leak.

We're trying hard to supply a much easier way to control the total memory of RocksDB backend in 1.10 release (target to be released in Jan. 2020), and sorry for the trouble to understand some internals of RocksDB for the time being.

Hope the information helps.

Best Regards,
Yu



On Mon, 25 Nov 2019 at 21:28, Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure if there is some simple way of doing that (maybe some other contributors will know more).

There are two potential ideas worth exploring:
- use periodically triggered save points for monitoring? If I remember correctly save points are never incremental
- use save point input/output format to analyse the content of the save point? [1]

I hope that someone else from the community will be able to help more here.

Piotrek

[1] https://flink.apache.org/feature/2019/09/13/state-processor-api.html

On 22 Nov 2019, at 22:48, Aaron Langford <[hidden email]> wrote:

Hey Flink Community,

I'm working on a Flink application where we are implementing operators that extend the RichFlatMap and RichCoFlatMap interfaces. As a result, we are working directly with Flink's state API (ValueState, ListState, MapState). Something that appears to be extremely valuable is having a way to monitor the state size for each operator. My team has already run into a few cases where our state has exploded and jobs fail because YARN kills containers who are exceeding their memory limits.

It is my understanding that the way to best monitor this kind of thing by watching checkpoint size per operator instance. This gets a little confusing when doing incremental check-pointing because the numbers reported seem to be a delta in state size, not the actual state size at that point in time. For my teams application, the total state size is not the sum of those deltas. What is the best way to get the total size of a checkpoint per operator for each checkpoint?

Additionally, monitoring de-serializing and serializing state in a Flink application is something that I haven't seen a great story for yet. It seems that some of the really badly written Flink operators tend to do most poorly when they demand lots of serde for each record. So giving visibility into how well an application is handling these types of operations seems to be a valuable guard rail for flink developers. Does anyone have existing solutions for this, or are there pointers to some work that can be done to improve this story?

Aaron