Hi,
what could cause the following exception? org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162) Thank you, Alex |
Hi,
I think in general this means that your producer client does not connect to the correct Broker (the leader) but to a broker that is just a follower and the follower can not execute that request. However, I am not sure what causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) has an idea? Best, Stefan > Am 04.05.2018 um 15:45 schrieb Alexander Smirnov <[hidden email]>: > > Hi, > > what could cause the following exception? > > org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition. > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93) > at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219) > at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) > at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162) > > > Thank you, > Alex |
Hi,
I think Stefan is right. Quick google search points to this: https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition Please let us know if changing your configuration will solve the problem! Piotrek
|
Thanks for quick turnaround Stefan, Piotr This is a rare reproducible issue and I will keep an eye on it searching on the Stack Overflow I found https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash They say that the problem is fixed in 0.10.2.1 of kafka producer so I wonder which version is used in FlinkKafkaProducer integration. For earlier versions it is proposed to use configuration:
On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <[hidden email]> wrote:
|
FlinkKafka011Producer uses Kafka 0.11.0.2.
However I’m not sure if bumping KafkaProducer version solves this issue or upgrading Kafka. What Kafka version are you using? Piotrek
|
Hi Piotr, using 0.11.0 Kafka version On Sat, May 5, 2018 at 10:19 AM Piotr Nowojski <[hidden email]> wrote:
|
Hi,
Regardless if that will fix the problem or not, please consider upgrading to Kafka 0.11.0.2 or 1.0.1. Kafka 0.11.0 release was quite messy and it might be that the bug you have hit was fixed in 0.11.0.2. As a side note, as far as we know our FlinkKafkaProducer011 works fine with Kafka 1.0.x. Piotrek
|
thank you Piotr On Mon, May 7, 2018 at 2:59 PM Piotr Nowojski <[hidden email]> wrote:
|
I've seen the same error while upgrading Kafka. We are using
FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka 1.1.0, each time a server was restarted, an already running Flink job failed with the same message. Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I am getting the same error. Is there a way to retry/ignore instead of killing the job? Jayant Ameta On Tue, May 22, 2018 at 7:58 PM gerardg <[hidden email]> wrote: I've seen the same error while upgrading Kafka. We are using |
Hi Jayant, Can you provide more specific information? For example, the version of your Flink, the version of kafka on which Flink-Kafka-Connector depends, and the version of kafka server. Thanks, vino. Jayant Ameta <[hidden email]> 于2018年9月4日周二 下午12:32写道:
|
Flink: 1.4.2 flink-connector-kafka-0.11_2.11 (1.4.2) Kafka: 0.10.1.0 Jayant Ameta On Tue, Sep 4, 2018 at 10:16 AM vino yang <[hidden email]> wrote:
|
Hi Jayant, Can you try to connect to kafka server 0.10.x via flink-connector-kafka-0.10? See if it still throws this exception. Thanks, vino. Jayant Ameta <[hidden email]> 于2018年9月4日周二 下午1:20写道:
|
Free forum by Nabble | Edit this page |