Hi Community,
I have a number of flink jobs running inside my session cluster with varying checkpoint intervals plus a large amount of operator state and in times of high load, the jobs fail due to checkpoint timeouts (set to 6 minutes). I can only assume this is because the latencies for saving checkpoints at these times of high load increase. I have a 30 node HDFS cluster for checkpoints... however I see that only 4 of these nodes are being used for storage. Is there a way of ensuring the load is evenly spread? Could there be another reason for these checkpoint timeouts? Events are being consumed from kafka, to kafka with EXACTLY ONCE guarantees enabled.
Thank you very much!
M. |
Hi, I think there are many reasons that could lead to the checkpoint timeout. Would you like to share some detailed information of checkpoint? For example, the detailed checkpoint information from the web.[1] And which Flink version do you use? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html Best, Guowei On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl <[hidden email]> wrote:
|
Thank you for the information. I have a feeling this is more to do with EXACTLY_ONCE kafka producers and transactions not playing nice with checkpoints and a timeout happens. The jobs seem to fail and hit this restart and fail loop. Looking in the logs, taskmanager logs grow very large with the same messages repeating over and over again. Ive attacked a file for this. The two lines that give me pause are:
Closing the Kafka producer with timeoutMillis = 0 ms. Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
I'm not really sure which timeout this is but it looks like there is a timeout loop happening here.
The Kafka producer has been configured as such (the transaction timeout has been set on the kafka server to match the producer):
Properties kafkaProducerProps = new Properties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Additionally, each taskmanager has been configured with 4GB of memory, there is a sliding window of 10 seconds with a slide of 1 second, and the cluster setup is using flink native.
Any hints would be much appreciated!
Regards, M.
From: Guowei Ma <[hidden email]>
Sent: 01 April 2021 14:19 To: Geldenhuys, Morgan Karl Cc: user Subject: Re: Checkpoint timeouts at times of high load Hi,
I think there are many reasons that could lead to the checkpoint timeout.
Would you like to share some detailed information of checkpoint? For example, the detailed checkpoint information from the web.[1] And which Flink version do you use? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html Best,
Guowei
On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl <[hidden email]> wrote:
taskmanager_logs.log (29K) Download Attachment |
It could very well be that your job gets stuck in a restart loop for some reason. Can you either post the full TaskManager logs here, or try to figure out yourself why the first checkpoint that timed out, timed out? Backpressure or blocked operators are a common cause for this. In your case, it could very well be that the Kafka producer is not confirming the checkpoint due to the Kafka transactions. If backpressure is causing this, consider enabling unaligned checkpoints. It could also be. the case that the transactions of Kafka are too slow, causing backpressure and checkpoint timeouts?! On Mon, Apr 5, 2021 at 9:57 AM Geldenhuys, Morgan Karl <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |