Hi everyone,
I'm trying to test exactly once functionality with my job under production load. The job is reading from kafka, using kafka timestamp as event time, aggregates every minute and outputs to other kafka topic. I use checkpoint interval 10 seconds. Everything seems to be working fine, but when I look to the log on INFO level, I see that with each checkpoint, new kafka producer is created and then closed again. 1. Is this how it is supposed to work ? 2. Is checkpoint interval 10 second too often ? Thanks, Maxim. |
Hi Maxim
If you use the EXACTLY_ONCE semantic (instead of AT_LEAST_ONCE or NONE) for flink kafka producer. It will create new producer when every new checkpoint comes [1]. This is by design and from my point of view, the checkpoint interval of 10 seconds might be a
bit too often. In general I think interval of 3 minutes should be enough. If you cannot offer the source rewind time after failover, you could turn the interval more often.
Best
Yun Tang
From: Maxim Parkachov <[hidden email]>
Sent: Monday, April 6, 2020 23:16 To: [hidden email] <[hidden email]> Subject: New kafka producer on each checkpoint Hi everyone,
I'm trying to test exactly once functionality with my job under production load. The job is reading from kafka, using kafka timestamp as event time, aggregates every minute and outputs to other kafka topic. I use checkpoint interval 10 seconds.
Everything seems to be working fine, but when I look to the log on INFO level, I see that with each checkpoint, new kafka producer is created and then closed again.
1. Is this how it is supposed to work ?
2. Is checkpoint interval 10 second too often ?
Thanks,
Maxim.
|
Hi Yun, thanks for the answer. I did now increased checkpoint interval, but still I don't understand reason for creating producer and re-connecting to to kafka broker each time. According to documentation: Note: Semantic.EXACTLY_ONCE mode uses a fixed size pool of KafkaProducers per each FlinkKafkaProducer011 instance. One of each of those producers is used per one checkpoint. If the number of concurrent checkpoints exceeds the pool size, FlinkKafkaProducer011 will throw an exception and will fail the whole application. Please configure max pool size and max number of concurrent checkpoints accordingly. I assumed that this is also true for post 011 producers as well. I expected to have 5 (default) producers created and used without re-instantiating producer each time. In my case checkpoint is so fast that I will never have concurrent checkpoints. Regards, Maxim. On Wed, Apr 8, 2020 at 4:52 AM Yun Tang <[hidden email]> wrote:
|
Hi Maxim, That is a good question. I don't see an obvious reason that we cannot reuse the producers. That said, there might be some corner cases where the KafkaProducer is not reusable. For example, if the checkpoint interval is long, the producer.id assigned by the TransactionCoordinator may have expired on the broker side and the producer may not be reusable anymore. But that should be a rare case. [hidden email] might know some more reasons that the producers are not reused when it was initially implemented. Thanks, JIangjie (Becket) Qin On Mon, Apr 13, 2020 at 4:59 PM Maxim Parkachov <[hidden email]> wrote:
|
A slightly more common case that may cause the producer to be not reusable is when there is no data for long time, the producer won't send any request to the broker and the tansactional.id may also expire on the broker side. On Tue, Apr 14, 2020 at 8:44 AM Becket Qin <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |