Hi there,
flink version: 1.0.0 kafka version: 0.9.0.0 env: local I run the script below: ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myGroup --partition.assignment.strategy round robin But I got the error: ava.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List; at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131) The code as below: DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>("nginx-logs", new SimpleStringSchema(),parameterTool.getProperties())); messageStream.rebalance().map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return "Kafka and Flink says: " + value; } }).print(); I check the error with google, but it shows that it is a method of kafka 0.9.01. Any idea? Thanks. |
I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't talk about kafka 0.9.0.1. On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <[hidden email]> wrote: Hi there, |
Hi!
A "NoSuchMethodError" usually means that you compile and run against different versions. Make sure the version you reference in the IDE and the version on the cluster are the same. Greetings, Stephan On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan <[hidden email]> wrote:
|
Hi there,
I check my build.gradle file, I use 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that this lib is based on kaka-clients 0.9.0.1. I want to use Flink streaming to consume Kafka’s events in realtime, but I’m confused by Flink’s libs with different versions. Which flink-connector-kafka is comparable with kafka 0.9.0.0 ? My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java part of my build.grade: 'org.apache.kafka:kafka_2.10:0.9.0.0', 'org.apache.kafka:kafka-clients:0.9.0.0', 'org.apache.flink:flink-java:1.0.0', 'org.apache.flink:flink-streaming-java_2.10:1.0.0', 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0', 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0 Any advice ? Thanks.
|
I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use following dependency. Thanks<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency> On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen <[hidden email]> wrote:
|
I follow the example of kafka 0.8.0.0 on Flink doc.
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("partition.assignment.strategy", "range"); DataStream<String> messageStream = env .addSource(new FlinkKafkaConsumer09<String>("nginx-logs", new SimpleStringSchema(), properties)); messageStream .rebalance() .map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return "Kafka and Flink says: " + value; } }).print(); env.execute(); } Always got the error below:
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List; at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
|
Did you make sure the flinkconnector version and flink version is the same ? Also for 0.8.0.0 you will have to use FlinkKafkaConsumer08 On Fri, Apr 1, 2016 at 3:21 PM, Zhun Shen <[hidden email]> wrote:
|
Yeah, I mean I read the demo with FlinkKafkaConsumer08(http://data-artisans.com/kafka-flink-a-practical-how-to/) then I wrote the program based on Kafka 0.9.0.0 and Flink 1.0.0.
|
The issue may be that you include Kafka twice: 1) You explicitly add "org.apache.kafka:kafka-clients:0.9.0.0" 2) You add "org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0", which internally adds "org.apache.kafka:kafka-clients:0.9.0.1" These two Kafka versions may conflict. I would drop the dependency (1) and simply let the FlinkKafkaConsumer pull whatever dependency it needs by itself. The 0.9.0.1 client the Flink internally uses should read fine from Kafka 0.9.0.0 brokers. Greetings, Stephan On Fri, Apr 1, 2016 at 5:19 PM, Zhun Shen <[hidden email]> wrote:
|
I created a new project, and only add kaka-client, Flink-kafka-connect and Flink streaming libs, it works.
Thanks.
|
Free forum by Nabble | Edit this page |