Flink Kafka Producer Exception

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Kafka Producer Exception

Navneeth Krishnan
Hi,

I have a kafka source and sink in my pipeline and when I start my job I get this error and the job goes to failed state. I checked the kafka node and everything looks good. Any suggestion on what is happening here? Thanks.

java.lang.Exception: Failed to send data to Kafka: The server disconnected before a response was received.
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:443)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:420)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:394)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:612)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:598)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at com.transformations.MyProcessor.flatMap(MyProcessor.java:115)
at com.transformations.MyProcessor.flatMap(MyProcessor.java:47)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748)
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka Producer Exception

Navneeth Krishnan
Hi, 

I'm receiving this error and due to which I'm not able to run my job. Any help is greatly appreciated. Thanks.

On Tue, Dec 12, 2017 at 10:21 AM, Navneeth Krishnan <[hidden email]> wrote:
Hi,

I have a kafka source and sink in my pipeline and when I start my job I get this error and the job goes to failed state. I checked the kafka node and everything looks good. Any suggestion on what is happening here? Thanks.

java.lang.Exception: Failed to send data to Kafka: The server disconnected before a response was received.
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:443)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:420)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:394)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:612)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:598)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at com.transformations.MyProcessor.flatMap(MyProcessor.java:115)
at com.transformations.MyProcessor.flatMap(MyProcessor.java:47)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748)

Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka Producer Exception

Tzu-Li (Gordon) Tai
Hi Navneeth,

The exception you are getting is a Kafka NetworkException.
From the provided information I can’t really tell much and can only guess, but are you sure that the client / broker versions match?
It seems like that you are using 0.10; the default client version in the Flink Kafka 0.10 connector is 0.10.2.1.

Cheers,
Gordon

On 14 December 2017 at 5:37:31 AM, Navneeth Krishnan ([hidden email]) wrote:

I have a kafka source and sink in my pipeline and when I start my job I get this error and the job goes to failed state. I checked the kafka node and everything looks good. Any suggestion on what is happening here? Thanks.