Many thanks for your replies.
After I increased MinPauseBetweenCheckpoints and moved to a memory backend for checkpoint. It has disappeared.
Thank you both again for your help.
Regards,
Min
From: Piotr Nowojski [mailto:[hidden email]]
Sent: Donnerstag, 11. April 2019 15:01
To: Fabian Hueske
Cc: Tan, Min; user
Subject: [External] Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE
Hi Min and Fabian,
The pool size is independent of the parallelism, task slots count or task managers count. The only thing that you should consider is how many simultaneous checkpoints you might have in your setup.
As Fabian wrote, with
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
The default value of the pool size of 5 should be more than enough.
Could you double check if something is not overriding those configuration values? If not could you provide the JobManager and TaskManager logs?
Piotrek
On 11 Apr 2019, at 09:32, Fabian Hueske <[hidden email]> wrote:
Hi Min,
I think the pool size is per parallel sink task, i.e., it should be independent of the parallelism of the sink operator.
From my understanding a pool size of 5 should be fine if the maximum number of concurrent checkpoints is 1.
Running out of connections would mean that there are 5 in-flight checkpoints that were not completed, which seems a lot to me (given that the sink is probably at the end of the program).
If I remember correctly, Piotr (in CC) was working on the exactly-once feature of the Kafka producer.
Maybe he can help.
Best,
Fabian
Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb <[hidden email]>:
Hi,
I keep getting exceptions "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints."
I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase this size. What considerations should I take to increase this size? what is a size for a normal setting e.g. 32?
I have a check point setting like this and run a parallelism of 16 and have a check point setting like this
public static void setup(StreamExecutionEnvironment env) {
env.enableCheckpointing(2_000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
env.getCheckpointConfig().setCheckpointTimeout(60_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));
//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
Regards,
Min
Free forum by Nabble | Edit this page |