Kafka integration error

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka integration error

Stefanos Antaris
Hi to all,

i am trying to make Flink to work with Kafka but i always have the following exception. It works perfect on my laptop but when i try to use it on the cluster it always fails. 

java.lang.Exception
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
	at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
	at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:759)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getMissingOffsetsFromKafka(LegacyFetcher.java:712)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:462)


Here is my pom.xml if it helps with the error

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.0.0</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.0.0</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>1.0.0</version>
</dependency>


</dependencies>

Best regards,
Stefanos
Reply | Threaded
Open this post in threaded view
|

Re: Kafka integration error

rmetzger0
Hi Stefanos,

this looks like an issue with Kafka. Which version does your broker have?
Can you check the logs of the broker you are trying to connect to?

On Fri, Mar 11, 2016 at 5:27 PM, Stefanos Antaris <[hidden email]> wrote:
Hi to all,

i am trying to make Flink to work with Kafka but i always have the following exception. It works perfect on my laptop but when i try to use it on the cluster it always fails. 

java.lang.Exception
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
	at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
	at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:759)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getMissingOffsetsFromKafka(LegacyFetcher.java:712)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:462)


Here is my pom.xml if it helps with the error

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.0.0</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.0.0</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>1.0.0</version>
</dependency>


</dependencies>

Best regards,
Stefanos