Hey all, I am running into an issue where if I run 2 flink jobs (same jar, different configuration), that produce to different kafka topics on the same broker, using the 1.7 FlinkKafkaProducer set with EXACTLY_ONCE
semantics, both jobs go into a checkpoint exception loop every 15 seconds or so: Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer
with the same transactionalId, or the producer's transaction has been expired by the broker. As soon as one of the jobs is cancelled, things go back to normal for the other job. I tried manually setting the TRANSACTIONAL_ID_CONFIG config in the producer to be unique for each of the jobs. My producer
transaction timeout is set to 5 minutes, and flink checkpointing is set to 1 minute. Is there some way to prevent these jobs from tripping over each other in execution while retaining exactly once processing? |
Hi, I think this is unexpected. The generated transactional ids should not be clashing. Looking at the FlinkKafkaProducer code, it seems like the generation is only a function of the subtask id of the FlinkKafkaProducer, which could be the same across 2 different Kafka sources. I'm not completely certain about this. Piotr (in CC) might have more insights for this. Cheers, Gordon On Wed, Feb 13, 2019 at 9:15 AM Slotterback, Chris <[hidden email]> wrote:
|
Hi Tzu-Li, Any updated on this. This is consistently reproducible. Same jar - Separate source topic to Separate destination topic. This sort of blocker for flink upgrada. i tried with 1.7.2 but no luck. org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:98) at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:33) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) Rohan On Wed, Feb 13, 2019 at 12:33 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Thanks
Rohan |
Hi, I just saw a JIRA opened for this: https://issues.apache.org/jira/browse/FLINK-11654. The JIRA ticket's description matches what I had in mind and can confirm the bug assessment. Unfortunately, I currently do not have the capacity to provide a fix and test for this. For the meantime, I've made this a blocker for releasing 1.8.0. It would be great if someone can try out the proposed fix mentioned in the JIRA, see if it fixes the issue in your cases, and provide a PR for the patch. Thanks, Gordon On Tue, Feb 19, 2019 at 9:46 AM Rohan Thimmappa <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |