Problems with FlinkKafkaProducer - closing after timeoutMillis = 9223372036854775807 ms.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Problems with FlinkKafkaProducer - closing after timeoutMillis = 9223372036854775807 ms.

Tim Josefsson
Hello!

I'm having some problems with my KafkaProducer that I've been unable to find a solution to.

I've set up a simple Flink Job that reads from one kafka topic, using 
   kafkaProps.setProperty("isolation.level", "read_committed") 
since I want to support exactly once data in my application.

After doing some enriching of the data I read from kafka I have the following producer set up 

FlinkKafkaProducer<PlayerSessionEnriched> kafkaSinkProducer = new FlinkKafkaProducer<>(
                "enrichedPlayerSessionsTest",
                new KafkaStringSerializationSchema("enrichedPlayerSessionsTest"),
                producerProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

The producer above is then added as a sink at the end of my Flink job.

Now when I run this application I get the following message,    
    13:44:40,758 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-6, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] ProducerId set to 21280 with epoch 4
    13:44:40,759 INFO  org.apache.kafka.clients.producer.KafkaProducer               - [Producer clientId=producer-6, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
Sometime I also see the following:
    13:44:43,740 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-26, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to -1 with epoch -1
    13:44:44,136 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-26, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to 21297 with epoch 11
13:44:44,147 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 has no restore state.
Now since this isn't an error the job doesn't crash while running and data does get written to Kafka even with this message. However it does seem wrong to me and I'm wondering if anyone has any insight into why this is happening? 
I'm attaching a GIST with the complete log from the application, I ran the job with env.setParallelism(1) but I still get around 26 producers created which still seems odd to me. Running without any parallelism set creates about 300-400 producers (based of the clientIds reported)
Thankful for any insight into this!
Best regards,
Tim

flink-kafka-producer-timeoutMillis-error.gist (243K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Problems with FlinkKafkaProducer - closing after timeoutMillis = 9223372036854775807 ms.

Tim Josefsson
To add to this, setting FlinkKafkaProducer.Semantic.AT_LEAST_ONCE instead of EXACTLY_ONCE makes the problem go away so I imagine there is something wrong with my setup.
I'm using Kafka 2.2 and I have the following things set on the cluster:
transaction.max.timeout.ms=3600000
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

On Mon, 16 Nov 2020 at 14:05, Tim Josefsson <[hidden email]> wrote:
Hello!

I'm having some problems with my KafkaProducer that I've been unable to find a solution to.

I've set up a simple Flink Job that reads from one kafka topic, using 
   kafkaProps.setProperty("isolation.level", "read_committed") 
since I want to support exactly once data in my application.

After doing some enriching of the data I read from kafka I have the following producer set up 

FlinkKafkaProducer<PlayerSessionEnriched> kafkaSinkProducer = new FlinkKafkaProducer<>(
                "enrichedPlayerSessionsTest",
                new KafkaStringSerializationSchema("enrichedPlayerSessionsTest"),
                producerProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

The producer above is then added as a sink at the end of my Flink job.

Now when I run this application I get the following message,    
    13:44:40,758 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-6, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] ProducerId set to 21280 with epoch 4
    13:44:40,759 INFO  org.apache.kafka.clients.producer.KafkaProducer               - [Producer clientId=producer-6, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
Sometime I also see the following:
    13:44:43,740 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-26, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to -1 with epoch -1
    13:44:44,136 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-26, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to 21297 with epoch 11
13:44:44,147 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 has no restore state.
Now since this isn't an error the job doesn't crash while running and data does get written to Kafka even with this message. However it does seem wrong to me and I'm wondering if anyone has any insight into why this is happening? 
I'm attaching a GIST with the complete log from the application, I ran the job with env.setParallelism(1) but I still get around 26 producers created which still seems odd to me. Running without any parallelism set creates about 300-400 producers (based of the clientIds reported)
Thankful for any insight into this!
Best regards,
Tim


--

Tim Josefsson

Webstep GPtW
mobil   +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70

[hidden email] 
webstep.se
Suttungs gränd 2
753 19 Uppsala

Stockholm | Uppsala | Malmö | Sundsvall | Oslo 
Bergen | Stavanger | Trondheim | Kristiansand

LinkedInFacebookFacebook
Reply | Threaded
Open this post in threaded view
|

Re: Problems with FlinkKafkaProducer - closing after timeoutMillis = 9223372036854775807 ms.

Tzu-Li (Gordon) Tai
Hi,

One thing to clarify first:
I think the "Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms" log doesn't necessarily mean that a producer was closed due to timeout (Long.MAX_VALUE).
I guess that is just a Kafka log message that is logged when a Kafka producer is closed without specifying a timeout (i.e., infinite timeout, or Long.MAX_VALUE in Kafka's case).

With that in mind, when using exactly-once semantics for the FlinkKafkaProducer, there is a fixed-sized pool of short-living Kafka producers that are created for each concurrent checkpoint.
When a checkpoint begins, the FlinkKafkaProducer creates a new producer for that checkpoint. Once said checkpoint completes, the producer for that checkpoint is attempted to be closed and recycled.
So, it is normal to see logs of Kafka producers being closed if you're using an exactly-once transactional FlinkKafkaProducer.

Best,
Gordon

On Mon, Nov 16, 2020 at 9:11 PM Tim Josefsson <[hidden email]> wrote:
To add to this, setting FlinkKafkaProducer.Semantic.AT_LEAST_ONCE instead of EXACTLY_ONCE makes the problem go away so I imagine there is something wrong with my setup.
I'm using Kafka 2.2 and I have the following things set on the cluster:
transaction.max.timeout.ms=3600000
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

On Mon, 16 Nov 2020 at 14:05, Tim Josefsson <[hidden email]> wrote:
Hello!

I'm having some problems with my KafkaProducer that I've been unable to find a solution to.

I've set up a simple Flink Job that reads from one kafka topic, using 
   kafkaProps.setProperty("isolation.level", "read_committed") 
since I want to support exactly once data in my application.

After doing some enriching of the data I read from kafka I have the following producer set up 

FlinkKafkaProducer<PlayerSessionEnriched> kafkaSinkProducer = new FlinkKafkaProducer<>(
                "enrichedPlayerSessionsTest",
                new KafkaStringSerializationSchema("enrichedPlayerSessionsTest"),
                producerProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

The producer above is then added as a sink at the end of my Flink job.

Now when I run this application I get the following message,    
    13:44:40,758 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-6, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] ProducerId set to 21280 with epoch 4
    13:44:40,759 INFO  org.apache.kafka.clients.producer.KafkaProducer               - [Producer clientId=producer-6, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
Sometime I also see the following:
    13:44:43,740 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-26, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to -1 with epoch -1
    13:44:44,136 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-26, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to 21297 with epoch 11
13:44:44,147 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 has no restore state.
Now since this isn't an error the job doesn't crash while running and data does get written to Kafka even with this message. However it does seem wrong to me and I'm wondering if anyone has any insight into why this is happening? 
I'm attaching a GIST with the complete log from the application, I ran the job with env.setParallelism(1) but I still get around 26 producers created which still seems odd to me. Running without any parallelism set creates about 300-400 producers (based of the clientIds reported)
Thankful for any insight into this!
Best regards,
Tim


--

Tim Josefsson

Webstep GPtW
mobil   +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70

[hidden email] 
webstep.se
Suttungs gränd 2
753 19 Uppsala

Stockholm | Uppsala | Malmö | Sundsvall | Oslo 
Bergen | Stavanger | Trondheim | Kristiansand

LinkedInFacebookFacebook