Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

Posted by Fabian Hueske-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Default-Kafka-producers-pool-size-for-FlinkKafkaProducer-Semantic-EXACTLY-ONCE-tp27124p27201.html

Hi Min,

I think the pool size is per parallel sink task, i.e., it should be independent of the parallelism of the sink operator.
From my understanding a pool size of 5 should be fine if the maximum number of concurrent checkpoints is 1.
Running out of connections would mean that there are 5 in-flight checkpoints that were not completed, which seems a lot to me (given that the sink is probably at the end of the program).

If I remember correctly, Piotr (in CC) was working on the exactly-once feature of the Kafka producer.
Maybe he can help.

Best,
Fabian

Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb <[hidden email]>:

Hi,

 

I keep getting exceptions "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints."

 

I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase this size. What considerations should I take to increase this size? what is a size for a normal setting e.g. 32?

 

I have a check point setting like this and run a parallelism of 16 and have a check point setting like this

 

public static void setup(StreamExecutionEnvironment env) {

    env.enableCheckpointing(2_000);

    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);

    env.getCheckpointConfig().setCheckpointTimeout(60_000);

    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

    env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));

    //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

}

 

Regards,

 

Min