Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

min.tan

Hi,

 

I keep getting exceptions "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints."

 

I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase this size. What considerations should I take to increase this size? what is a size for a normal setting e.g. 32?

 

I have a check point setting like this and run a parallelism of 16 and have a check point setting like this

 

public static void setup(StreamExecutionEnvironment env) {

    env.enableCheckpointing(2_000);

    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);

    env.getCheckpointConfig().setCheckpointTimeout(60_000);

    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

    env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));

    //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

}

 

Regards,

 

Min



E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement
Reply | Threaded
Open this post in threaded view
|

Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

Fabian Hueske-2
Hi Min,

I think the pool size is per parallel sink task, i.e., it should be independent of the parallelism of the sink operator.
From my understanding a pool size of 5 should be fine if the maximum number of concurrent checkpoints is 1.
Running out of connections would mean that there are 5 in-flight checkpoints that were not completed, which seems a lot to me (given that the sink is probably at the end of the program).

If I remember correctly, Piotr (in CC) was working on the exactly-once feature of the Kafka producer.
Maybe he can help.

Best,
Fabian

Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb <[hidden email]>:

Hi,

 

I keep getting exceptions "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints."

 

I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase this size. What considerations should I take to increase this size? what is a size for a normal setting e.g. 32?

 

I have a check point setting like this and run a parallelism of 16 and have a check point setting like this

 

public static void setup(StreamExecutionEnvironment env) {

    env.enableCheckpointing(2_000);

    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);

    env.getCheckpointConfig().setCheckpointTimeout(60_000);

    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

    env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));

    //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

}

 

Regards,

 

Min

Reply | Threaded
Open this post in threaded view
|

Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

Piotr Nowojski-3
Hi Min and Fabian,

The pool size is independent of the parallelism, task slots count or task managers count. The only thing that you should consider is how many simultaneous checkpoints you might have in your setup.

As Fabian wrote, with
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

The default value of the pool size of 5 should be more than enough. 

Could you double check if something is not overriding those configuration values? If not could you provide the JobManager and TaskManager logs?

Piotrek

On 11 Apr 2019, at 09:32, Fabian Hueske <[hidden email]> wrote:

Hi Min,

I think the pool size is per parallel sink task, i.e., it should be independent of the parallelism of the sink operator.
From my understanding a pool size of 5 should be fine if the maximum number of concurrent checkpoints is 1.
Running out of connections would mean that there are 5 in-flight checkpoints that were not completed, which seems a lot to me (given that the sink is probably at the end of the program).

If I remember correctly, Piotr (in CC) was working on the exactly-once feature of the Kafka producer.
Maybe he can help.

Best,
Fabian

Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb <[hidden email]>:

Hi,

 

I keep getting exceptions "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints."

 

I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase this size. What considerations should I take to increase this size? what is a size for a normal setting e.g. 32?

 

I have a check point setting like this and run a parallelism of 16 and have a check point setting like this

 

public static void setup(StreamExecutionEnvironment env) {

    env.enableCheckpointing(2_000);

    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);

    env.getCheckpointConfig().setCheckpointTimeout(60_000);

    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

    env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));

    //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

}

 

Regards,

 

Min


Reply | Threaded
Open this post in threaded view
|

RE: Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

min.tan

Many thanks for your replies.

 

After I increased MinPauseBetweenCheckpoints and moved to a memory backend for checkpoint. It has disappeared.

 

Thank you both again for your help.

 

 

Regards,

 

Min

From: Piotr Nowojski [mailto:[hidden email]]
Sent: Donnerstag, 11. April 2019 15:01
To: Fabian Hueske
Cc: Tan, Min; user
Subject: [External] Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

 

Hi Min and Fabian,

 

The pool size is independent of the parallelism, task slots count or task managers count. The only thing that you should consider is how many simultaneous checkpoints you might have in your setup.

 

As Fabian wrote, with

> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

 

The default value of the pool size of 5 should be more than enough. 

 

Could you double check if something is not overriding those configuration values? If not could you provide the JobManager and TaskManager logs?

 

Piotrek



On 11 Apr 2019, at 09:32, Fabian Hueske <[hidden email]> wrote:

 

Hi Min,

 

I think the pool size is per parallel sink task, i.e., it should be independent of the parallelism of the sink operator.

From my understanding a pool size of 5 should be fine if the maximum number of concurrent checkpoints is 1.

Running out of connections would mean that there are 5 in-flight checkpoints that were not completed, which seems a lot to me (given that the sink is probably at the end of the program).

 

If I remember correctly, Piotr (in CC) was working on the exactly-once feature of the Kafka producer.

Maybe he can help.

 

Best,

Fabian

 

Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb <[hidden email]>:

Hi,

 

I keep getting exceptions "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints."

 

I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase this size. What considerations should I take to increase this size? what is a size for a normal setting e.g. 32?

 

I have a check point setting like this and run a parallelism of 16 and have a check point setting like this

 

public static void setup(StreamExecutionEnvironment env) {

    env.enableCheckpointing(2_000);

    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);

    env.getCheckpointConfig().setCheckpointTimeout(60_000);

    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

    env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));

    //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

}

 

Regards,

 

Min

 



E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement