How to read UI checkpoints section

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to read UI checkpoints section

Andrea Spina
Dear community,
I'm running a job with flink-1.6.4 and the following configs about checkpointing:

2019-07-15 12:28:32,653 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'hdfs://rbl1.radicalbit.io:8020/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE}
Screenshot 2019-07-15 at 15.51.48.png

Long story short: I'd like to run checkpoints against RocksDB quite large state asynchronously and incrementally. What it actually reports my history is described below 
Screenshot 2019-07-15 at 16.31.03.png

I was expecting was State Size that is more or less fixed across checkpoints since the checkpoint mechanism is incremental and delta-based, but the state is actually every increasing and duration is always greater than the previous. Is this column representing just the delta size or the whole state size?

If the checkpointing is still incremental, why the board is reporting always increasing metrics in terms of time and size? 

Thank you very much for your help,

--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Reply | Threaded
Open this post in threaded view
|

Re: How to read UI checkpoints section

Fabian Hueske-2
Hi Andrea,

The reported state size is the total size of the checkpoint (AFAIK).

Regarding the incremental checkpointing, this is only helpful if not all keys are updated between two checkpoints.
As soon as a key was touched, it needs to be synced. If all (or most of) your data changes between two checkpoints, incremental checkpoints do not help.

Best, Fabian

Am Mo., 15. Juli 2019 um 17:18 Uhr schrieb Andrea Spina <[hidden email]>:
Dear community,
I'm running a job with flink-1.6.4 and the following configs about checkpointing:

2019-07-15 12:28:32,653 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'hdfs://rbl1.radicalbit.io:8020/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE}
Screenshot 2019-07-15 at 15.51.48.png

Long story short: I'd like to run checkpoints against RocksDB quite large state asynchronously and incrementally. What it actually reports my history is described below 
Screenshot 2019-07-15 at 16.31.03.png

I was expecting was State Size that is more or less fixed across checkpoints since the checkpoint mechanism is incremental and delta-based, but the state is actually every increasing and duration is always greater than the previous. Is this column representing just the delta size or the whole state size?

If the checkpointing is still incremental, why the board is reporting always increasing metrics in terms of time and size? 

Thank you very much for your help,

--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Reply | Threaded
Open this post in threaded view
|

Re: How to read UI checkpoints section

Congxian Qiu
Hi Andrea

1. for Incremental mode, the column of state size is the incremental size(this column is represented by the sum of all state handles, but the PlaceholderStreamStateHandle, used in incremental, always return 0 when calling getStateSize())
2. for the second question, maybe we need to dig it deeper to find out the reason.

Best,
Congxian


Fabian Hueske <[hidden email]> 于2019年7月23日周二 下午6:28写道:
Hi Andrea,

The reported state size is the total size of the checkpoint (AFAIK).

Regarding the incremental checkpointing, this is only helpful if not all keys are updated between two checkpoints.
As soon as a key was touched, it needs to be synced. If all (or most of) your data changes between two checkpoints, incremental checkpoints do not help.

Best, Fabian

Am Mo., 15. Juli 2019 um 17:18 Uhr schrieb Andrea Spina <[hidden email]>:
Dear community,
I'm running a job with flink-1.6.4 and the following configs about checkpointing:

2019-07-15 12:28:32,653 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'hdfs://rbl1.radicalbit.io:8020/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE}
Screenshot 2019-07-15 at 15.51.48.png

Long story short: I'd like to run checkpoints against RocksDB quite large state asynchronously and incrementally. What it actually reports my history is described below 
Screenshot 2019-07-15 at 16.31.03.png

I was expecting was State Size that is more or less fixed across checkpoints since the checkpoint mechanism is incremental and delta-based, but the state is actually every increasing and duration is always greater than the previous. Is this column representing just the delta size or the whole state size?

If the checkpointing is still incremental, why the board is reporting always increasing metrics in terms of time and size? 

Thank you very much for your help,

--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Reply | Threaded
Open this post in threaded view
|

Re: How to read UI checkpoints section

Fabian Hueske-2
Thanks for correcting me Congxian!
I guess, we should rename "State Size" to "Checkpointed Data Size" or something like that to make it more clear.

If the amount of checkpointed data grows, checkpointing will simply take more time.
Nonetheless, 38 minutes for 100GB seems not right.

Cheers, Fabian

Am Di., 23. Juli 2019 um 17:12 Uhr schrieb Congxian Qiu <[hidden email]>:
Hi Andrea

1. for Incremental mode, the column of state size is the incremental size(this column is represented by the sum of all state handles, but the PlaceholderStreamStateHandle, used in incremental, always return 0 when calling getStateSize())
2. for the second question, maybe we need to dig it deeper to find out the reason.

Best,
Congxian


Fabian Hueske <[hidden email]> 于2019年7月23日周二 下午6:28写道:
Hi Andrea,

The reported state size is the total size of the checkpoint (AFAIK).

Regarding the incremental checkpointing, this is only helpful if not all keys are updated between two checkpoints.
As soon as a key was touched, it needs to be synced. If all (or most of) your data changes between two checkpoints, incremental checkpoints do not help.

Best, Fabian

Am Mo., 15. Juli 2019 um 17:18 Uhr schrieb Andrea Spina <[hidden email]>:
Dear community,
I'm running a job with flink-1.6.4 and the following configs about checkpointing:

2019-07-15 12:28:32,653 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'hdfs://rbl1.radicalbit.io:8020/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE}
Screenshot 2019-07-15 at 15.51.48.png

Long story short: I'd like to run checkpoints against RocksDB quite large state asynchronously and incrementally. What it actually reports my history is described below 
Screenshot 2019-07-15 at 16.31.03.png

I was expecting was State Size that is more or less fixed across checkpoints since the checkpoint mechanism is incremental and delta-based, but the state is actually every increasing and duration is always greater than the previous. Is this column representing just the delta size or the whole state size?

If the checkpointing is still incremental, why the board is reporting always increasing metrics in terms of time and size? 

Thank you very much for your help,

--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Reply | Threaded
Open this post in threaded view
|

Re: How to read UI checkpoints section

Yun Tang
Hi Fabian,

Several months ago, for some internal purpose (monitor the overall state size as one of the factors to decide whether to scale up/down). We add a new method named #getFullStateSize() in AbstractCheckpointStats to return the overall state size in our internal Flink. This would have to add similar methods to OperatorSubtaskState, SubtaskStateStats, StateObjectCollection and so on. It seems not every developer could know the exact meaning of state size changed when executing incremental checkpoints. I just create a issue https://issues.apache.org/jira/browse/FLINK-13390 to track this problem.

For Andrea's question, if your job come across really high back pressure or process elements really slowly. The checkpoint barrier of all channels can not be sent to downstream, which might explain why your 'Buffered during alighment' did not change much compared to the checkpoint duration increase.

Best
Yun Tang

From: Fabian Hueske <[hidden email]>
Sent: Tuesday, July 23, 2019 23:45
To: Congxian Qiu <[hidden email]>
Cc: Andrea Spina <[hidden email]>; user <[hidden email]>
Subject: Re: How to read UI checkpoints section
 
Thanks for correcting me Congxian!
I guess, we should rename "State Size" to "Checkpointed Data Size" or something like that to make it more clear.

If the amount of checkpointed data grows, checkpointing will simply take more time.
Nonetheless, 38 minutes for 100GB seems not right.

Cheers, Fabian

Am Di., 23. Juli 2019 um 17:12 Uhr schrieb Congxian Qiu <[hidden email]>:
Hi Andrea

1. for Incremental mode, the column of state size is the incremental size(this column is represented by the sum of all state handles, but the PlaceholderStreamStateHandle, used in incremental, always return 0 when calling getStateSize())
2. for the second question, maybe we need to dig it deeper to find out the reason.

Best,
Congxian


Fabian Hueske <[hidden email]> 于2019年7月23日周二 下午6:28写道:
Hi Andrea,

The reported state size is the total size of the checkpoint (AFAIK).

Regarding the incremental checkpointing, this is only helpful if not all keys are updated between two checkpoints.
As soon as a key was touched, it needs to be synced. If all (or most of) your data changes between two checkpoints, incremental checkpoints do not help.

Best, Fabian

Am Mo., 15. Juli 2019 um 17:18 Uhr schrieb Andrea Spina <[hidden email]>:
Dear community,
I'm running a job with flink-1.6.4 and the following configs about checkpointing:

2019-07-15 12:28:32,653 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'hdfs://rbl1.radicalbit.io:8020/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE}
Screenshot 2019-07-15 at 15.51.48.png

Long story short: I'd like to run checkpoints against RocksDB quite large state asynchronously and incrementally. What it actually reports my history is described below 
Screenshot 2019-07-15 at 16.31.03.png

I was expecting was State Size that is more or less fixed across checkpoints since the checkpoint mechanism is incremental and delta-based, but the state is actually every increasing and duration is always greater than the previous. Is this column representing just the delta size or the whole state size?

If the checkpointing is still incremental, why the board is reporting always increasing metrics in terms of time and size? 

Thank you very much for your help,

--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT