Hello!
I'm having some problems with my KafkaProducer that I've been unable to find a solution to. I've set up a simple Flink Job that reads from one kafka topic, using kafkaProps.setProperty("isolation.level", "read_committed") since I want to support exactly once data in my application. After doing some enriching of the data I read from kafka I have the following producer set up FlinkKafkaProducer<PlayerSessionEnriched> kafkaSinkProducer = new FlinkKafkaProducer<>( "enrichedPlayerSessionsTest", new KafkaStringSerializationSchema("enrichedPlayerSessionsTest"), producerProps, FlinkKafkaProducer.Semantic.EXACTLY_ONCE ); The producer above is then added as a sink at the end of my Flink job. Now when I run this application I get the following message, 13:44:40,758 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-6, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] ProducerId set to 21280 with epoch 4 13:44:40,759 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-6, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. Sometime I also see the following: 13:44:43,740 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-26, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to -1 with epoch -1 13:44:44,136 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-26, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to 21297 with epoch 11 13:44:44,147 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 has no restore state. Now since this isn't an error the job doesn't crash while running and data does get written to Kafka even with this message. However it does seem wrong to me and I'm wondering if anyone has any insight into why this is happening? I'm attaching a GIST with the complete log from the application, I ran the job with env.setParallelism(1) but I still get around 26 producers created which still seems odd to me. Running without any parallelism set creates about 300-400 producers (based of the clientIds reported) Thankful for any insight into this! Best regards, Tim flink-kafka-producer-timeoutMillis-error.gist (243K) Download Attachment |
To add to this, setting FlinkKafkaProducer.Semantic.AT_LEAST_ONCE instead of EXACTLY_ONCE makes the problem go away so I imagine there is something wrong with my setup. I'm using Kafka 2.2 and I have the following things set on the cluster: transaction.max.timeout.ms=3600000 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 On Mon, 16 Nov 2020 at 14:05, Tim Josefsson <[hidden email]> wrote:
|
Hi, One thing to clarify first: I think the "Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms" log doesn't necessarily mean that a producer was closed due to timeout (Long.MAX_VALUE). I guess that is just a Kafka log message that is logged when a Kafka producer is closed without specifying a timeout (i.e., infinite timeout, or Long.MAX_VALUE in Kafka's case). With that in mind, when using exactly-once semantics for the FlinkKafkaProducer, there is a fixed-sized pool of short-living Kafka producers that are created for each concurrent checkpoint. When a checkpoint begins, the FlinkKafkaProducer creates a new producer for that checkpoint. Once said checkpoint completes, the producer for that checkpoint is attempted to be closed and recycled. So, it is normal to see logs of Kafka producers being closed if you're using an exactly-once transactional FlinkKafkaProducer. Best, Gordon On Mon, Nov 16, 2020 at 9:11 PM Tim Josefsson <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |