ProducerFencedException when running 2 jobs with FlinkKafkaProducer

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

ProducerFencedException when running 2 jobs with FlinkKafkaProducer

cslotterback

Hey all,

 

I am running into an issue where if I run 2 flink jobs (same jar, different configuration), that produce to different kafka topics on the same broker, using the 1.7 FlinkKafkaProducer set with EXACTLY_ONCE semantics, both jobs go into a checkpoint exception loop every 15 seconds or so:

 

Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

 

As soon as one of the jobs is cancelled, things go back to normal for the other job. I tried manually setting the TRANSACTIONAL_ID_CONFIG config in the producer to be unique for each of the jobs. My producer transaction timeout is set to 5 minutes, and flink checkpointing is set to 1 minute. Is there some way to prevent these jobs from tripping over each other in execution while retaining exactly once processing?

Reply | Threaded
Open this post in threaded view
|

Re: ProducerFencedException when running 2 jobs with FlinkKafkaProducer

Tzu-Li (Gordon) Tai
Hi,

I think this is unexpected. The generated transactional ids should not be clashing.
Looking at the FlinkKafkaProducer code, it seems like the generation is only a function of the subtask id of the FlinkKafkaProducer, which could be the same across 2 different Kafka sources.

I'm not completely certain about this. Piotr (in CC) might have more insights for this.

Cheers,
Gordon

On Wed, Feb 13, 2019 at 9:15 AM Slotterback, Chris <[hidden email]> wrote:

Hey all,

 

I am running into an issue where if I run 2 flink jobs (same jar, different configuration), that produce to different kafka topics on the same broker, using the 1.7 FlinkKafkaProducer set with EXACTLY_ONCE semantics, both jobs go into a checkpoint exception loop every 15 seconds or so:

 

Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

 

As soon as one of the jobs is cancelled, things go back to normal for the other job. I tried manually setting the TRANSACTIONAL_ID_CONFIG config in the producer to be unique for each of the jobs. My producer transaction timeout is set to 5 minutes, and flink checkpointing is set to 1 minute. Is there some way to prevent these jobs from tripping over each other in execution while retaining exactly once processing?

Reply | Threaded
Open this post in threaded view
|

Re: ProducerFencedException when running 2 jobs with FlinkKafkaProducer

Rohan Thimmappa
Hi Tzu-Li,

Any updated on this. This is consistently reproducible.

Same jar - Separate source topic to Separate  destination topic.

This sort of blocker for flink upgrada. i tried with 1.7.2 but no luck.

org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94)
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:98)
	at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:33)
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)




Rohan

On Wed, Feb 13, 2019 at 12:33 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

I think this is unexpected. The generated transactional ids should not be clashing.
Looking at the FlinkKafkaProducer code, it seems like the generation is only a function of the subtask id of the FlinkKafkaProducer, which could be the same across 2 different Kafka sources.

I'm not completely certain about this. Piotr (in CC) might have more insights for this.

Cheers,
Gordon

On Wed, Feb 13, 2019 at 9:15 AM Slotterback, Chris <[hidden email]> wrote:

Hey all,

 

I am running into an issue where if I run 2 flink jobs (same jar, different configuration), that produce to different kafka topics on the same broker, using the 1.7 FlinkKafkaProducer set with EXACTLY_ONCE semantics, both jobs go into a checkpoint exception loop every 15 seconds or so:

 

Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

 

As soon as one of the jobs is cancelled, things go back to normal for the other job. I tried manually setting the TRANSACTIONAL_ID_CONFIG config in the producer to be unique for each of the jobs. My producer transaction timeout is set to 5 minutes, and flink checkpointing is set to 1 minute. Is there some way to prevent these jobs from tripping over each other in execution while retaining exactly once processing?



--
Thanks
Rohan
Reply | Threaded
Open this post in threaded view
|

Re: ProducerFencedException when running 2 jobs with FlinkKafkaProducer

Tzu-Li (Gordon) Tai
Hi,

I just saw a JIRA opened for this: https://issues.apache.org/jira/browse/FLINK-11654.

The JIRA ticket's description matches what I had in mind and can confirm the bug assessment. Unfortunately, I currently do not have the capacity to provide a fix and test for this.
For the meantime, I've made this a blocker for releasing 1.8.0. It would be great if someone can try out the proposed fix mentioned in the JIRA, see if it fixes the issue in your cases, and provide a PR for the patch.

Thanks,
Gordon

On Tue, Feb 19, 2019 at 9:46 AM Rohan Thimmappa <[hidden email]> wrote:
Hi Tzu-Li,

Any updated on this. This is consistently reproducible.

Same jar - Separate source topic to Separate  destination topic.

This sort of blocker for flink upgrada. i tried with 1.7.2 but no luck.

org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94)
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:98)
	at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:33)
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)




Rohan

On Wed, Feb 13, 2019 at 12:33 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

I think this is unexpected. The generated transactional ids should not be clashing.
Looking at the FlinkKafkaProducer code, it seems like the generation is only a function of the subtask id of the FlinkKafkaProducer, which could be the same across 2 different Kafka sources.

I'm not completely certain about this. Piotr (in CC) might have more insights for this.

Cheers,
Gordon

On Wed, Feb 13, 2019 at 9:15 AM Slotterback, Chris <[hidden email]> wrote:

Hey all,

 

I am running into an issue where if I run 2 flink jobs (same jar, different configuration), that produce to different kafka topics on the same broker, using the 1.7 FlinkKafkaProducer set with EXACTLY_ONCE semantics, both jobs go into a checkpoint exception loop every 15 seconds or so:

 

Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

 

As soon as one of the jobs is cancelled, things go back to normal for the other job. I tried manually setting the TRANSACTIONAL_ID_CONFIG config in the producer to be unique for each of the jobs. My producer transaction timeout is set to 5 minutes, and flink checkpointing is set to 1 minute. Is there some way to prevent these jobs from tripping over each other in execution while retaining exactly once processing?



--
Thanks
Rohan