Here is my simple program to use Kafka:
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} import org.apache.flink.streaming.api.environment._ import org.apache.flink.streaming.connectors.kafka import org.apache.flink.streaming.connectors.kafka.api._ import org.apache.flink.streaming.util.serialization._ import org.apache.flink.api.common.typeinfo._ object TestKafka { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)) .addSink(new KafkaSink[String]("localhost:2181", "test", new JavaDefaultStringSchema)) env.execute("Test Kafka") } } in build.sbt: libraryDependencies ++= Seq("org.apache.flink" % "flink-scala" % "0.9.0", "org.apache.flink" % "flink-clients" % "0.9.0") libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1" libraryDependencies += "org.apache.flink" % "flink-streaming-scala" % "0.9.0" libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}") libraryDependencies += "com.101tec" % "zkclient" % "0.5" I'm using "sbt assembly" to build a fat jar, so the target jar file is supposed to contain everything. However, there are errors when running the target jar file: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/serialize/ZkSerializer at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:69) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:105) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.initializeConnection(KafkaSource.java:175) at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.open(KafkaSource.java:207) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openOperator(StreamTask.java:158) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:52) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.I0Itec.zkclient.serialize.ZkSerializer at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ......... I already put zkclient-0.5.jar under <flink_root_dir>/lib. Anyone can shed some light? Thanks for help! Wendong |
to be specific, the error occurs at:
org.apache.flink.streaming.connectors.kafka.api.KafkaSource.initializeConnection(KafkaSource.java:175) |
Hi, Are you running this locally or in a cluster environment? Did you put the zkClient-0.5.jar in the /lib directory of every node (also task managers)?On Tue, Jul 21, 2015 at 7:10 AM, Wendong <[hidden email]> wrote: to be specific, the error occurs at: |
Hi Max,
The program runs locally in one machine. I use "grep ZkSerializer" in the generated fat jar file and it exists, so it seems build process is OK. I also put zkclient-0.5.jar under <flink_root_dir>/lib/ and it contains the class of ZkSerializer. Thanks, Wendong |
also tried using zkclient-0.3.jar in lib/, updated build.sbt and rebuild. It doesn't help. Still got the same error of NoClassDefFoundError: ZkSerializer in flink.streaming.connectors.kafka.api.KafkaSource.open().
|
Maybe you can posted your pom.xml file to identify your issue. Best regards Hawin On Tue, Jul 21, 2015 at 2:57 PM, Wendong <[hidden email]> wrote: also tried using zkclient-0.3.jar in lib/, updated build.sbt and rebuild. It |
Hi Hawin,
I'm using sbt as shown in the original post. I tried using maven and pom.xml, but got different NoClassDefFoundError: com/yammer/metrics/Metrics. I've downloaded metrics-core-2.2.0.jar under lib/ but it doesn't help. It seems the errors from sbt and Maven belong to same nature. Here is my pom.xml (standard parts are omitted to make it more readable): <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.9</version> <executions> <execution> <id>unpack</id> <phase>prepare-package</phase> <goals> <goal>unpack</goal> </goals> <configuration> <artifactItems> <artifactItem> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>0.9.0</version> <type>jar</type> <overWrite>false</overWrite> <outputDirectory>${project.build.directory}/classes</outputDirectory> <includes>org/apache/flink/**</includes> </artifactItem> <artifactItem> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> <type>jar</type> <overWrite>false</overWrite> <outputDirectory>${project.build.directory}/classes</outputDirectory> <includes>kafka/**</includes> </artifactItem> </artifactItems> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.4</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </build> <profiles> <profile> <id>build-jar</id> <activation> <activeByDefault>false</activeByDefault> </activation> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala</artifactId> <version>0.9.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-core</artifactId> <version>0.9.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>0.9.0</version> <scope>provided</scope> </dependency> </dependencies> </profile> </profiles> |
Hi Wendong
Please make sure you have dependencies as below. Good luck ********************************************************************* <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-core</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-connectors</artifactId> <version>0.9.0-milestone-1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.1</version> </dependency> ****************************************************************** Best regards Hawin -----Original Message----- From: Wendong [mailto:[hidden email]] Sent: Tuesday, July 21, 2015 5:16 PM To: [hidden email] Subject: Re: Flink Kafka cannot find org/I0Itec/zkclient/serialize/ZkSerializer Hi Hawin, I'm using sbt as shown in the original post. I tried using maven and pom.xml, but got different NoClassDefFoundError: com/yammer/metrics/Metrics. I've downloaded metrics-core-2.2.0.jar under lib/ but it doesn't help. It seems the errors from sbt and Maven belong to same nature. Here is my pom.xml (standard parts are omitted to make it more readable): <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.9</version> <executions> <execution> <id>unpack</id> <phase>prepare-package</phase> <goals> <goal>unpack</goal> </goals> <configuration> <artifactItems> <artifactItem> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>0.9.0</version> <type>jar</type> <overWrite>false</overWrite> <outputDirectory>${project.build.directory}/classes</outputDirectory> <includes>org/apache/flink/**</includes> </artifactItem> <artifactItem> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> <type>jar</type> <overWrite>false</overWrite> <outputDirectory>${project.build.directory}/classes</outputDirectory> <includes>kafka/**</includes> </artifactItem> </artifactItems> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.4</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </build> <profiles> <profile> <id>build-jar</id> <activation> <activeByDefault>false</activeByDefault> </activation> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala</artifactId> <version>0.9.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-core</artifactId> <version>0.9.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>0.9.0</version> <scope>provided</scope> </dependency> </dependencies> </profile> </profiles> -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Ka fka-cannot-find-org-I0Itec-zkclient-serialize-ZkSerializer-tp2199p2222.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hi Wendong, I don’t think that you have to include Cheers, On Wed, Jul 22, 2015 at 9:59 AM, Hawin Jiang <[hidden email]> wrote: Hi Wendong |
Here are some comments on Java Classloading: - The Zookeeper code is implicitly loaded by the Kafka code. - When java implicitly loads a class at some point in the program, is uses the classloader of the class of that point in the program. Here it will use the class loader that loaded the Kafka code. - You have to make sure that the zookeeper dependency is where the kafka connector is. It will not work to have kafka in /lib and zookeeper in the fat jar, for example. But putting everything into the /lib folder should work. Note: Its important that this is available on all worker machines. On Wed, Jul 22, 2015 at 10:12 AM, Till Rohrmann <[hidden email]> wrote:
|
Thanks Stephan and Till.
Since I couldn't make a working example of Flink and Kafka after struggling a week, I have to temporarily stop the evaluation work and switch to other tasks. I hope in the near future, someone can come up with a working sample of KafkaWordCount, similar to that of Spark sample, and it would greatly benefit the community. Wendong <quote author="Stephan Ewen"> Here are some comments on Java Classloading: - The Zookeeper code is implicitly loaded by the Kafka code. - When java implicitly loads a class at some point in the program, is uses the classloader of the class of that point in the program. Here it will use the class loader that loaded the Kafka code. - You have to make sure that the zookeeper dependency is where the kafka connector is. It will not work to have kafka in /lib and zookeeper in the fat jar, for example. But putting everything into the /lib folder should work. Note: Its important that this is available on all worker machines. |
Wendong, Sorry to hear that you are having such trouble with the example. We are using the Kafka connector with many people, building the examples with Maven. It works without any problems. Maybe SBT is just not handling these dependencies correctly, or the SBT script defines the dependencies incorrectly. Stephan On Fri, Jul 24, 2015 at 7:30 AM, Wendong <[hidden email]> wrote: Thanks Stephan and Till. |
Can you share your full sbt build file with me? I'm trying to reproduce the issue, but I have never used sbt before. I was able to configure the assembly plugin, but the produced fat jar didn't contain the zkclient. Maybe your full sbt build file would help me to identify the issue faster. Let me know if you would like to work with maven. I'm pretty sure that we can get it to work with maven. On Fri, Jul 24, 2015 at 11:14 AM, Stephan Ewen <[hidden email]> wrote:
|
Below is the build.sbt I am using (also include project/assembly.sbt) :
//------------- Start build.sbt --------------------------- version := "1.0" scalaVersion := "2.10.4" libraryDependencies ++= Seq("org.apache.flink" % "flink-scala" % "0.9.0", "org.apache.flink" % "flink-clients" % "0.9.0") libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1" libraryDependencies += "org.apache.flink" % "flink-streaming-scala" % "0.9.0" libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}") libraryDependencies += "com.101tec" % "zkclient" % "0.3" // META-INF discarding mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first } } //------------- End build.sbt --------------------------- //--------- Start project/assembly.sbt -------------- addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0") //--------- End project/assembly.sbt -------------- |
Thank you for posting the full SBT files.
I now understand why you exclude the kafka dependency from Flink. SBT does not support to read maven properties only defined in profiles. I will fix the issue for Flink 0.10 (https://issues.apache.org/jira/browse/FLINK-2408) I was not able to reproduce the issue locally. What I did: - "sbt assembly" --> which created /home/robert/Downloads/flink-sbt-master/target/scala-2.10/sbt-0.13/flink-sbt-with-assembly-assembly-1.0.jar - " ./bin/start-local.sh" - "./bin/flink run -c org.myorg.quickstart.Job /home/robert/Downloads/flink-sbt-master/target/scala-2.10/sbt-0.13/flink-sbt-with-assembly-assembly-1.0.jar " --> no errors. On Sat, Jul 25, 2015 at 3:10 AM, Wendong <[hidden email]> wrote: Below is the build.sbt I am using (also include project/assembly.sbt) : |
Just to confirm: are you able to compile and run the program of testing Kafka similar to the following?
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} import org.apache.flink.streaming.api.environment._ import org.apache.flink.streaming.connectors.kafka import org.apache.flink.streaming.connectors.kafka.api._ import org.apache.flink.streaming.util.serialization._ import org.apache.flink.api.common.typeinfo._ object TestKafka { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)) .addSink(new KafkaSink[String]("localhost:2181", "test", new JavaDefaultStringSchema)) env.execute("Test Kafka") } } |
Yes, I was running exactly that code. This is a repository containing the files: https://github.com/rmetzger/scratch/tree/flink-sbt-master On Tue, Jul 28, 2015 at 2:01 AM, Wendong <[hidden email]> wrote: Just to confirm: are you able to compile and run the program of testing Kafka |
Free forum by Nabble | Edit this page |