Hello!
I'm having some trouble with getting Exactly Once guarantees working for my Flink-Kafka setup. I'm running Flink 1.10 and Kafka 2.2 (with default config) and I keep getting: Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms. Errors while trying to start the program. After reading around I found that the suggested solution to this is to set transaction.timeout.ms for the producer in Flink to match transaction.max.timeout.ms in Kafka (which should default to 15 minutes). However even after doing this I still get the same error so I must be doing something else wrong. My current code looks like this: (setting up stream environment) (setting up the producer) And then I simply have a consumer that consumes from a different Kafka cluster and then writes to a new cluster (using the producer above) Also when checking my logs I see the following message: 11:41:56,345 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = 1 [omitted for brevity] transaction.timeout.ms = 900000 transactional.id = Source: Read player events from Kafka -> Map Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not needed for backfill -> Sink: Post events to playerEvents Kafka-a15b4dd4812495cebdc94e33125ef858-1 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer If anyone has any experience with solving this problem it would be greatly appreciated. Best regards, --
|
On 10.11.20 11:53, Tim Josefsson wrote:
> Also when checking my logs I see the following message: > 11:41:56,345 INFO org.apache.kafka.clients.producer.ProducerConfig > - ProducerConfig values: > acks = 1 > [omitted for brevity] > transaction.timeout.ms = 900000 > transactional.id = Source: Read player events from Kafka -> Map > Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not > needed for backfill -> Sink: Post events to playerEvents > Kafka-a15b4dd4812495cebdc94e33125ef858-1 > value.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer The interesting thing would be to figure out where that `transaction.timeout.ms = 900000` is coming from. The default from Flink would be 60000, if nothing is configured. Are you specifying that value, maybe from the commandline or in code? Maybe it's a funny coincidence, but our StreamingKafkaITCase also specifies that timeout value. Best, Aljoscha |
Hey
Aljoscha, I create a Properties object and then set the property and finally add those properties when creating the producer. Properties producerProps = new Properties(); producerProps.setProperty("transaction.timeout.ms", "900000"); If I don't set that property my I instead get the following config when starting the job:
11:41:56,345 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = 1 [omitted for brevity] transaction.timeout.ms = 60000 transactional.id = Source: Read player events from Kafka -> Map Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not needed for backfill -> Sink: Post events to playerEvents Kafka-a15b4dd4812495cebdc94e33125ef858-1 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer So I imagine the Producer is picking up the change but it still returns errors when running the job. Best regards, Tim On Tue, 10 Nov 2020 at 16:14, Aljoscha Krettek <[hidden email]> wrote: On 10.11.20 11:53, Tim Josefsson wrote: --
|
Hmm, could you please post the full stack trace that leads to the
TimeoutException? Best, Aljoscha On 10.11.20 17:54, Tim Josefsson wrote: > Hey Aljoscha, > > I'm setting the transaction.timeout.ms when I create the FlinkKafkaProducer: > > I create a Properties object and then set the property and finally add > those properties when creating the producer. > > Properties producerProps = new Properties(); > producerProps.setProperty("transaction.timeout.ms", "900000"); > > If I don't set that property my I instead get the following config when > starting the job: > 11:41:56,345 INFO org.apache.kafka.clients.producer.ProducerConfig > - ProducerConfig values: > acks = 1 > [omitted for brevity] > transaction.timeout.ms = 60000 > transactional.id = Source: Read player events from Kafka -> Map > Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not > needed for backfill -> Sink: Post events to playerEvents > Kafka-a15b4dd4812495cebdc94e33125ef858-1 > value.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > > So I imagine the Producer is picking up the change but it still returns > errors when running the job. > > Best regards, > Tim > > > On Tue, 10 Nov 2020 at 16:14, Aljoscha Krettek <[hidden email]> wrote: > >> On 10.11.20 11:53, Tim Josefsson wrote: >>> Also when checking my logs I see the following message: >>> 11:41:56,345 INFO org.apache.kafka.clients.producer.ProducerConfig >>> - ProducerConfig values: >>> acks = 1 >>> [omitted for brevity] >>> transaction.timeout.ms = 900000 >>> transactional.id = Source: Read player events from Kafka -> Map >>> Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not >>> needed for backfill -> Sink: Post events to playerEvents >>> Kafka-a15b4dd4812495cebdc94e33125ef858-1 >>> value.serializer = class >>> org.apache.kafka.common.serialization.ByteArraySerializer >> >> The interesting thing would be to figure out where that >> `transaction.timeout.ms = 900000` is coming from. The default from Flink >> would be 60000, if nothing is configured. Are you specifying that value, >> maybe from the commandline or in code? >> >> Maybe it's a funny coincidence, but our StreamingKafkaITCase also >> specifies that timeout value. >> >> Best, >> Aljoscha >> >> > |
Sure, I've attached it to this email. The process seems to restart once the TimeoutException happens so it's repeated a couple of times. Thanks for looking at it! /Tim On Wed, 11 Nov 2020 at 10:37, Aljoscha Krettek <[hidden email]> wrote: Hmm, could you please post the full stack trace that leads to the
flink-exactly-once-problem.log (128K) Download Attachment |
Also realized I had a typo in the config dump I did in the previous email (the one from the 10th). If I don't do Properties producerProps = new Properties(); producerProps.setProperty("transaction.timeout.ms", "900000"); Then the value reported from the ProducerConfig is 3600000 and not 60000 as I had written. On Thu, 12 Nov 2020 at 13:37, Tim Josefsson <[hidden email]> wrote:
|
To further add to this problem, I've now got our ops team to set transaction.max.timeout.ms on our Kafka brokers to 1 hour (as suggested by the Flink docs). However the problem persists and I'm still getting the same error message. I've confirmed that this config setting is actually set on the Kafka brokers as well to rule out any mistakes there. Best, Tim On Thu, 12 Nov 2020 at 14:46, Tim Josefsson <[hidden email]> wrote:
|
Hi Tim, afaik we are confusing two things here, there is a transaction timeout = how long the transaction lasts until aborted. And what you see here is some timeout while creating the transaction in the first place. A quick google search turned up [1], from which I'd infer that you need to set TRANSACTIONAL_ID_CONFIG. On Thu, Nov 12, 2020 at 3:48 PM Tim Josefsson <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |