Hi, I keep getting exceptions "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints." I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase this size. What considerations should I take to increase this size? what is a size for a normal setting e.g. 32? I have a check point setting like this and run a parallelism of 16 and have a check point setting like this public static void setup(StreamExecutionEnvironment env) { env.enableCheckpointing(2_000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000); env.getCheckpointConfig().setCheckpointTimeout(60_000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64)); //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); } Regards, Min E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties. UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases. For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement |
Hi Min, I think the pool size is per parallel sink task, i.e., it should be independent of the parallelism of the sink operator. From my understanding a pool size of 5 should be fine if the maximum number of concurrent checkpoints is 1. Running out of connections would mean that there are 5 in-flight checkpoints that were not completed, which seems a lot to me (given that the sink is probably at the end of the program). If I remember correctly, Piotr (in CC) was working on the exactly-once feature of the Kafka producer. Maybe he can help. Best, Fabian Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb <[hidden email]>:
|
Hi Min and Fabian,
The pool size is independent of the parallelism, task slots count or task managers count. The only thing that you should consider is how many simultaneous checkpoints you might have in your setup. As Fabian wrote, with > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); The default value of the pool size of 5 should be more than enough. Could you double check if something is not overriding those configuration values? If not could you provide the JobManager and TaskManager logs? Piotrek
|
Many thanks for your replies. After I increased MinPauseBetweenCheckpoints and moved to a memory backend for checkpoint. It has disappeared. Thank you both again for your help. Regards, Min From: Piotr Nowojski [mailto:[hidden email]]
Hi Min and Fabian, The pool size is independent of the parallelism, task slots count or task managers count. The only thing that you should consider is how many simultaneous checkpoints you might have in your setup. As Fabian wrote, with > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); The default value of the pool size of 5 should be more than enough. Could you double check if something is not overriding those configuration values? If not could you provide the JobManager and TaskManager logs? Piotrek
On 11 Apr 2019, at 09:32, Fabian Hueske <[hidden email]> wrote: Hi Min, I think the pool size is per parallel sink task, i.e., it should be independent of the parallelism of the sink operator. From my understanding a pool size of 5 should be fine if the maximum number of concurrent checkpoints is 1. Running out of connections would mean that there are 5 in-flight checkpoints that were not completed, which seems a lot to me (given that the sink is probably at the end of the program). If I remember correctly, Piotr (in CC) was working on the exactly-once feature of the Kafka producer. Maybe he can help. Best, Fabian Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb <[hidden email]>:
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties. UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases. For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement |
Free forum by Nabble | Edit this page |