Flink OnCheckpointRollingPolicy streamingfilesink

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink OnCheckpointRollingPolicy streamingfilesink

Vijayendra Yadav
Hi Team,

Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY) on every checkpoint.  

.withRollingPolicy(OnCheckpointRollingPolicy.build())

Question: What are recommended values related to checkpointing to fsstate, should it be more frequent checkpoints, or longer intervals, how many concurrent checkpoints needs to be allowed, how much should be an ideal pause between each checkpoint.
Is there a way to control batch size here other than time ? any recommendations to all the parameters listed below? 
Note: I am trying to improve sink throughput. 


env.enableCheckpointing(chckptintervalmilli)      env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
 env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)

Thanks,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: Flink OnCheckpointRollingPolicy streamingfilesink

Andrey Zagrebin-5
Hi Vijay,

I think it depends on your job requirements, in particular how many records are processed per second and how much resources you have to process them.

If the checkpointing interval is short then the checkpointing overhead can be too high and you need more resources to efficiently keep up with the incoming streaming.

If the checkpointing interval is long, more records are batched together and the throughput is better.
On the other hand, the observed latency is lower because the batched results get flushed into the files and become visible in the external system only when checkpoint occurs to provide exactly once guarantee.

Best,
Andrey

On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY) on every checkpoint.  

.withRollingPolicy(OnCheckpointRollingPolicy.build())

Question: What are recommended values related to checkpointing to fsstate, should it be more frequent checkpoints, or longer intervals, how many concurrent checkpoints needs to be allowed, how much should be an ideal pause between each checkpoint.
Is there a way to control batch size here other than time ? any recommendations to all the parameters listed below? 
Note: I am trying to improve sink throughput. 


env.enableCheckpointing(chckptintervalmilli)      env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
 env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)

Thanks,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: Flink OnCheckpointRollingPolicy streamingfilesink

Vijayendra Yadav
Hi Andrey,

Thanks, 
what is recommendation for :  env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt) ?

1 or higher based on what factor.


Regards,
Vijay


On Tue, Aug 25, 2020 at 8:55 AM Andrey Zagrebin <[hidden email]> wrote:
Hi Vijay,

I think it depends on your job requirements, in particular how many records are processed per second and how much resources you have to process them.

If the checkpointing interval is short then the checkpointing overhead can be too high and you need more resources to efficiently keep up with the incoming streaming.

If the checkpointing interval is long, more records are batched together and the throughput is better.
On the other hand, the observed latency is lower because the batched results get flushed into the files and become visible in the external system only when checkpoint occurs to provide exactly once guarantee.

Best,
Andrey

On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY) on every checkpoint.  

.withRollingPolicy(OnCheckpointRollingPolicy.build())

Question: What are recommended values related to checkpointing to fsstate, should it be more frequent checkpoints, or longer intervals, how many concurrent checkpoints needs to be allowed, how much should be an ideal pause between each checkpoint.
Is there a way to control batch size here other than time ? any recommendations to all the parameters listed below? 
Note: I am trying to improve sink throughput. 


env.enableCheckpointing(chckptintervalmilli)      env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
 env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)

Thanks,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: Flink OnCheckpointRollingPolicy streamingfilesink

Andrey Zagrebin-5
Hi Vijay,

I would apply the same judgement. It is latency vs throughput vs spent resources vs practical need.

The more concurrent checkpoints your system is capable of handling, the better end-to-end result latency you will observe and see computation results more frequently.
On the other hand your system needs to provide more resources (maybe higher parallelism) to process more current checkpoints.

Again lees the checkpoints -> more records are batched together and the throughput is better.

It usually does not make sense to have a big number of current checkpoints which process only a handful of records in between if you do not observe any practical decrease of latency.
The system will just waste resources to process the checkpoints.

Best,
Andrey

On Fri, Aug 28, 2020 at 9:52 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Andrey,

Thanks, 
what is recommendation for :  env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt) ?

1 or higher based on what factor.


Regards,
Vijay


On Tue, Aug 25, 2020 at 8:55 AM Andrey Zagrebin <[hidden email]> wrote:
Hi Vijay,

I think it depends on your job requirements, in particular how many records are processed per second and how much resources you have to process them.

If the checkpointing interval is short then the checkpointing overhead can be too high and you need more resources to efficiently keep up with the incoming streaming.

If the checkpointing interval is long, more records are batched together and the throughput is better.
On the other hand, the observed latency is lower because the batched results get flushed into the files and become visible in the external system only when checkpoint occurs to provide exactly once guarantee.

Best,
Andrey

On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY) on every checkpoint.  

.withRollingPolicy(OnCheckpointRollingPolicy.build())

Question: What are recommended values related to checkpointing to fsstate, should it be more frequent checkpoints, or longer intervals, how many concurrent checkpoints needs to be allowed, how much should be an ideal pause between each checkpoint.
Is there a way to control batch size here other than time ? any recommendations to all the parameters listed below? 
Note: I am trying to improve sink throughput. 


env.enableCheckpointing(chckptintervalmilli)      env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
 env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)

Thanks,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: Flink OnCheckpointRollingPolicy streamingfilesink

Vijayendra Yadav
Thank You Andrey.

Regards,
Vijay


On Aug 29, 2020, at 3:38 AM, Andrey Zagrebin <[hidden email]> wrote:


Hi Vijay,

I would apply the same judgement. It is latency vs throughput vs spent resources vs practical need.

The more concurrent checkpoints your system is capable of handling, the better end-to-end result latency you will observe and see computation results more frequently.
On the other hand your system needs to provide more resources (maybe higher parallelism) to process more current checkpoints.

Again lees the checkpoints -> more records are batched together and the throughput is better.

It usually does not make sense to have a big number of current checkpoints which process only a handful of records in between if you do not observe any practical decrease of latency.
The system will just waste resources to process the checkpoints.

Best,
Andrey

On Fri, Aug 28, 2020 at 9:52 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Andrey,

Thanks, 
what is recommendation for :  env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt) ?

1 or higher based on what factor.


Regards,
Vijay


On Tue, Aug 25, 2020 at 8:55 AM Andrey Zagrebin <[hidden email]> wrote:
Hi Vijay,

I think it depends on your job requirements, in particular how many records are processed per second and how much resources you have to process them.

If the checkpointing interval is short then the checkpointing overhead can be too high and you need more resources to efficiently keep up with the incoming streaming.

If the checkpointing interval is long, more records are batched together and the throughput is better.
On the other hand, the observed latency is lower because the batched results get flushed into the files and become visible in the external system only when checkpoint occurs to provide exactly once guarantee.

Best,
Andrey

On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY) on every checkpoint.  

.withRollingPolicy(OnCheckpointRollingPolicy.build())

Question: What are recommended values related to checkpointing to fsstate, should it be more frequent checkpoints, or longer intervals, how many concurrent checkpoints needs to be allowed, how much should be an ideal pause between each checkpoint.
Is there a way to control batch size here other than time ? any recommendations to all the parameters listed below? 
Note: I am trying to improve sink throughput. 


env.enableCheckpointing(chckptintervalmilli)      env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
 env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)

Thanks,
Vijay