Checkpoint timeouts at times of high load

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoint timeouts at times of high load

Geldenhuys, Morgan Karl

Hi Community,


I have a number of flink jobs running inside my session cluster with varying checkpoint intervals plus a large amount of operator state and in times of high load, the jobs fail due to checkpoint timeouts (set to 6 minutes). I can only assume this is because the latencies for saving checkpoints at these times of high load increase. I have a 30 node HDFS cluster for checkpoints... however I see that only 4 of these nodes are being used for storage. Is there a way of ensuring the load is evenly spread? Could there be another reason for these checkpoint timeouts? Events are being consumed from kafka, to kafka with EXACTLY ONCE guarantees enabled.


Thank you very much!


M.

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint timeouts at times of high load

Guowei Ma
Hi,
I think there are many reasons that could lead to the checkpoint timeout.
Would you like to share some detailed information of checkpoint? For example, the detailed checkpoint information from the web.[1]  And which Flink version do you use?

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html

Best,
Guowei


On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl <[hidden email]> wrote:

Hi Community,


I have a number of flink jobs running inside my session cluster with varying checkpoint intervals plus a large amount of operator state and in times of high load, the jobs fail due to checkpoint timeouts (set to 6 minutes). I can only assume this is because the latencies for saving checkpoints at these times of high load increase. I have a 30 node HDFS cluster for checkpoints... however I see that only 4 of these nodes are being used for storage. Is there a way of ensuring the load is evenly spread? Could there be another reason for these checkpoint timeouts? Events are being consumed from kafka, to kafka with EXACTLY ONCE guarantees enabled.


Thank you very much!


M.

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint timeouts at times of high load

Geldenhuys, Morgan Karl

Thank you for the information. I have a feeling this is more to do with EXACTLY_ONCE kafka producers and transactions not playing nice with checkpoints and a timeout happens. The jobs seem to fail and hit this restart and fail loop. Looking in the logs, taskmanager logs grow very large with the same messages repeating over and over again. Ive attacked a file for this. The two lines that give me pause are:


Closing the Kafka producer with timeoutMillis = 0 ms.

Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.


I'm not really sure which timeout this is but it looks like there is a timeout loop happening here.


The Kafka producer has been configured as such (the transaction timeout has been set on the kafka server to match the producer):


Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty("bootstrap.servers", brokerList);
kafkaProducerProps.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "3600000");
kafkaProducerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
kafkaProducerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());
kafkaProducerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
kafkaProducerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "500");
kafkaProducerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
kafkaProducerProps.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
kafkaProducerProps.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
kafkaProducerProps.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");

FlinkKafkaProducer<String> myProducer =
new FlinkKafkaProducer<>(
producerTopic,
(KafkaSerializationSchema<String>) (value, aLong) -> {
return new ProducerRecord<>(producerTopic, value.getBytes());
},
kafkaProducerProps,
Semantic.EXACTLY_ONCE,
10);


And checkpoints have been configured as such:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configuring RocksDB state backend to use HDFS
String backupFolder = props.getProperty("hdfs.backupFolder");
StateBackend backend = new RocksDBStateBackend(backupFolder, true);
env.setStateBackend(backend);
// start a checkpoint based on supplied interval
env.enableCheckpointing(checkpointInterval);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkpointInterval);
// checkpoints have to complete within two minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(380000);
//env.getCheckpointConfig().setTolerableCheckpointFailureNumber();
// no external services which could take some time to respond, therefore 1
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are deleted after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

Additionally, each taskmanager has been configured with 4GB of memory, there is a sliding window of 10 seconds with a slide of 1 second, and the cluster setup is using flink native.


Any hints would be much appreciated!


Regards,

M.



From: Guowei Ma <[hidden email]>
Sent: 01 April 2021 14:19
To: Geldenhuys, Morgan Karl
Cc: user
Subject: Re: Checkpoint timeouts at times of high load
 
Hi,
I think there are many reasons that could lead to the checkpoint timeout.
Would you like to share some detailed information of checkpoint? For example, the detailed checkpoint information from the web.[1]  And which Flink version do you use?

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html

Best,
Guowei


On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl <[hidden email]> wrote:

Hi Community,


I have a number of flink jobs running inside my session cluster with varying checkpoint intervals plus a large amount of operator state and in times of high load, the jobs fail due to checkpoint timeouts (set to 6 minutes). I can only assume this is because the latencies for saving checkpoints at these times of high load increase. I have a 30 node HDFS cluster for checkpoints... however I see that only 4 of these nodes are being used for storage. Is there a way of ensuring the load is evenly spread? Could there be another reason for these checkpoint timeouts? Events are being consumed from kafka, to kafka with EXACTLY ONCE guarantees enabled.


Thank you very much!


M.


taskmanager_logs.log (29K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint timeouts at times of high load

rmetzger0
It could very well be that your job gets stuck in a restart loop for some reason. Can you either post the full TaskManager logs here, or try to figure out yourself why the first checkpoint that timed out, timed out?
Backpressure or blocked operators are a common cause for this. In your case, it could very well be that the Kafka producer is not confirming the checkpoint due to the Kafka transactions. If backpressure is causing this, consider enabling unaligned checkpoints. It could also be. the case that the transactions of Kafka are too slow, causing backpressure and checkpoint timeouts?!



On Mon, Apr 5, 2021 at 9:57 AM Geldenhuys, Morgan Karl <[hidden email]> wrote:

Thank you for the information. I have a feeling this is more to do with EXACTLY_ONCE kafka producers and transactions not playing nice with checkpoints and a timeout happens. The jobs seem to fail and hit this restart and fail loop. Looking in the logs, taskmanager logs grow very large with the same messages repeating over and over again. Ive attacked a file for this. The two lines that give me pause are:


Closing the Kafka producer with timeoutMillis = 0 ms.

Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.


I'm not really sure which timeout this is but it looks like there is a timeout loop happening here.


The Kafka producer has been configured as such (the transaction timeout has been set on the kafka server to match the producer):


Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty("bootstrap.servers", brokerList);
kafkaProducerProps.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "3600000");
kafkaProducerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
kafkaProducerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());
kafkaProducerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
kafkaProducerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "500");
kafkaProducerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
kafkaProducerProps.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
kafkaProducerProps.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
kafkaProducerProps.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");

FlinkKafkaProducer<String> myProducer =
new FlinkKafkaProducer<>(
producerTopic,
(KafkaSerializationSchema<String>) (value, aLong) -> {
return new ProducerRecord<>(producerTopic, value.getBytes());
},
kafkaProducerProps,
Semantic.EXACTLY_ONCE,
10);


And checkpoints have been configured as such:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configuring RocksDB state backend to use HDFS
String backupFolder = props.getProperty("hdfs.backupFolder");
StateBackend backend = new RocksDBStateBackend(backupFolder, true);
env.setStateBackend(backend);
// start a checkpoint based on supplied interval
env.enableCheckpointing(checkpointInterval);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkpointInterval);
// checkpoints have to complete within two minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(380000);
//env.getCheckpointConfig().setTolerableCheckpointFailureNumber();
// no external services which could take some time to respond, therefore 1
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are deleted after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

Additionally, each taskmanager has been configured with 4GB of memory, there is a sliding window of 10 seconds with a slide of 1 second, and the cluster setup is using flink native.


Any hints would be much appreciated!


Regards,

M.



From: Guowei Ma <[hidden email]>
Sent: 01 April 2021 14:19
To: Geldenhuys, Morgan Karl
Cc: user
Subject: Re: Checkpoint timeouts at times of high load
 
Hi,
I think there are many reasons that could lead to the checkpoint timeout.
Would you like to share some detailed information of checkpoint? For example, the detailed checkpoint information from the web.[1]  And which Flink version do you use?

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html

Best,
Guowei


On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl <[hidden email]> wrote:

Hi Community,


I have a number of flink jobs running inside my session cluster with varying checkpoint intervals plus a large amount of operator state and in times of high load, the jobs fail due to checkpoint timeouts (set to 6 minutes). I can only assume this is because the latencies for saving checkpoints at these times of high load increase. I have a 30 node HDFS cluster for checkpoints... however I see that only 4 of these nodes are being used for storage. Is there a way of ensuring the load is evenly spread? Could there be another reason for these checkpoint timeouts? Events are being consumed from kafka, to kafka with EXACTLY ONCE guarantees enabled.


Thank you very much!


M.