Hi All I am preparing Kafka and Flink performance test now. In order to avoid my mistakes, I have downloaded Kafka example from http://kafka.apache.org/ and Flink streaming Kafka example from http://flink.apache.org I have run both producer examples on the same cluster. No any issues from kafka.apache.org. But I have received some errors as below when I ran apache Flink Kafka producer. I also posted both code for your reference. Please take a look at it. Thanks. Exception in thread "main" java.lang.Error: Unresolved compilation problems: The import kafka.consumer cannot be resolved The import kafka.consumer cannot be resolved The import kafka.consumer cannot be resolved The import kafka.consumer cannot be resolved The import kafka.javaapi cannot be resolved ConsumerConnector cannot be resolved to a type ConsumerIterator cannot be resolved to a type ConsumerConnector cannot be resolved to a type Consumer cannot be resolved ConsumerConfig cannot be resolved to a type KafkaStream cannot be resolved to a type ConsumerConnector cannot be resolved to a type KafkaStream cannot be resolved to a type KafkaStream cannot be resolved to a type ConsumerConnector cannot be resolved to a type ConsumerIterator cannot be resolved to a type ConsumerIterator cannot be resolved to a type ConsumerIterator cannot be resolved to a type ConsumerConnector cannot be resolved to a type ConsumerConnector cannot be resolved to a type ConsumerConnector cannot be resolved to a type at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.<init>(KafkaSource.java:26) at org.apache.flink.streaming.connectors.kafka.KafkaConsumerExample.main(KafkaConsumerExample.java:42) Here is the Apache Flink example: *************************************Apache Flink*********************************************************************** StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4); @SuppressWarnings({ "unused", "serial" }) DataStream<String> stream1 = env.addSource(new SourceFunction<String>() { public void run(Collector<String> collector) throws Exception { for (int i = 0; i < 20; i++) { collector.collect("message #" + i); Thread.sleep(100L); } collector.collect(new String("q")); } public void cancel() { } }).addSink( new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema()) ) .setParallelism(3); System.out.println(host+" "+port+" "+topic); env.execute(); **********************************Apache Kafka*************************************************************** public Producer(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "192.168.0.112:9092"); // Use random partitioner. Don't need the key type. Just set it to Integer. // The message is of type String. producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); this.topic = topic; } public void run() { int messageNo = 1; while(true) { String messageStr = new String("LA_" + messageNo); producer.send(new KeyedMessage<Integer, String>(topic, messageStr)); messageNo++; } } Best regards Hawin |
Dear Hawin, This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance? Best, Marton |
Dear Marton Thanks for supporting again. I am running these examples at the same project and I am using Eclipse IDE to submit it to my Flink cluster. Here is my dependencies ****************************************************************************** <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>0.9.0-milestone-1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>0.9.0-milestone-1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-connectors</artifactId> <version>0.9.0-milestone-1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-core</artifactId> <version>0.9.0-milestone-1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> </dependency> </dependencies> ***************************************************************************************** Best regards Email: [hidden email] From: Márton Balassi [mailto:[hidden email]] Dear Hawin, This looks like a dependency issue, the java compiler does not find the kafka dependency. How are you trying to run this example? Is it from an IDE or submitting it to a flink cluster with bin/flink run? How do you define your dependencies, do you use maven or sbt for instance? Best, Marton |
Dear Hawin, No problem, I am gald that you are giving our Kafka connector a try. :) The dependencies listed look good. Can you run the example locally from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not have the access to the kafka dependency then. As a quick test you could copy the kafka jars to the lib folder of your Flink distribution on all the machines in your cluster. Everything that is there goes to the classpath of Flink. Another workaround with be to build a fat jar for your project containing all the dependencies with 'mvn assembly:assembly'. Neither of these are beautiful but would help tracking down the root cause. On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <[hidden email]> wrote:
|
Dear Marton What do you meaning for locally Eclipse with 'Run'. Do you want to me to run it on Namenode? But my namenode didn't install Kafka. I only installed Kafka on my data node servers. Do I need to install or copy Kafka jar on Namenode? Actually, I don't want to install everything on Name node server. Please advise me. Thanks. My Flink and Hadoop cluster info as below. Flink on NameNode Kafka,Zookeeper and FLink slave1 on Datanode1 Kafka,Zookeeper ,and Flink slave2 on Datanode2 Kafka, Zookeeper and Flink slave3 on Datanode3 On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi <[hidden email]> wrote:
|
As for locally I meant the machine that you use for development to see whether this works without parallelism. :-) No need to install stuff on your Namenode of course. Installing Kafka on a machine and having the Kafka Java dependencies available for Flink are two very different things. Try adding the following [1] to your maven pom. Then execute 'mvn assembly:assembly', this will produce a fat jar suffiexed jar-with-dependencies.jar. You should be able to run the example form that. On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <[hidden email]> wrote:
|
Hi Marton I have to add whole pom.xml file or just only plugin as below. I saw L286 to L296 are not correct information in pom.xml. Thanks.
On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi <[hidden email]> wrote:
|
I use following dependencies and it works fine . <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>0.9-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>0.9-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-core</artifactId> <version>0.9-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>0.9-SNAPSHOT</version> </dependency> <dependency> On Mon, Jun 22, 2015 at 10:07 PM, Hawin Jiang <[hidden email]> wrote:
|
Dear Hawin, Sorry, I ahve managed to link to a pom that has been changed in the meantime. But we have added a section to our doc clarifying your question. [1] Since then Stephan has proposed an even nicer solution that did not make it into the doc yet, namely if you start from our quickstart pom and add your dependencies to that simply executing 'mvn package -Pbuild-jar' you get a jar with all your the code that is needed to run it on the cluster, but not more. See [3] for more on the quickstart. On Tue, Jun 23, 2015 at 6:48 AM, Ashutosh Kumar <[hidden email]> wrote:
|
Dear Marton I have upgraded my Flink to 0.9.0. But I could not consume a data from Kafka by Flink. I have fully followed your example. Please help me. Thanks. Here is my code StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4); DataStream<String> kafkaStream = env .addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema())); kafkaStream.print(); env.execute(); Here are some errors: 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Shutting down 15/06/25 22:57:52 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedByInterruptException 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Stopped 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Shutdown completed 15/06/25 22:57:52 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1435298169147] All connections stopped 15/06/25 22:57:52 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. 15/06/25 22:57:52 INFO zookeeper.ZooKeeper: Session: 0x14e2e5b2dad000a closed 15/06/25 22:57:52 INFO consumer.ZookeeperConsumerConnector: [flink-group_hawin-1435298168910-10520844], ZKConsumerConnector shutdown completed in 40 ms 15/06/25 22:57:52 ERROR tasks.SourceStreamTask: Custom Source -> Stream Sink (3/4) failed org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 68617769 at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:232) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268) at org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:40) at org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:24) at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.run(KafkaSource.java:193) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.StreamCorruptedException: invalid stream header: 68617769 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:222) ... 8 more On Tue, Jun 23, 2015 at 6:31 AM, Márton Balassi <[hidden email]> wrote:
|
Dear Marton Here are some errors when I run KafkaProducerExample.java from Eclipse. kafka.common.KafkaException: fetching topic metadata for topics [Set(flink-kafka-topic)] from broker [ArrayBuffer(id:0,host:192.168.0.112,port:2181)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.Producer.send(Producer.scala:77) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at org.apache.flink.streaming.connectors.kafka.api.KafkaSink.invoke(KafkaSink.java:183) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:102) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer.send(SyncProducer.scala:113) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) On Thu, Jun 25, 2015 at 11:06 PM, Hawin Jiang <[hidden email]> wrote:
|
Hi, could you please try replacing JavaDefaultStringSchema() with SimpleStringSchema() in your first example. The one where you get this exception: org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 68617769 Cheers, Aljoscha On Fri, 26 Jun 2015 at 08:21 Hawin Jiang <[hidden email]> wrote:
|
Hi Aljoscha You are the best. Thank you very much. Right now, It is working now. Best regards Hawin On Fri, Jun 26, 2015 at 12:28 AM, Aljoscha Krettek <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |