Hi,
I couldn't get flink and kafka working together. It looks like all examples I tried from web site fails with the following Exception. Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase or when I do something like this like it is in the website val stream = senv.addSource(new FlinkKafkaConsumer08[String]("join_test", new SimpleStringSchema(), properties)).print() <console>:73: error: overloaded method value addSource with alternatives: [T](function: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] <and> [T](function: org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] cannot be applied to (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08[String])
val stream = senv.addSource(new FlinkKafkaConsumer08[String]("join_test", new SimpleStringSchema(), properties)).print() can anyone share a simple example of how to get Kafka Stream as a Table using scala shell? No need for any fancy schema just needs to print the value. I am using the latest version of flink 1.41 and my lib folder containers flink-connector-kafka-0.8_2.11-1.4.1.jar I wanted to use Kafka 0.9 but that didn't work so I thought let me just get something working first and downgraded to 0.8 but 0.8 examples on the website also don't seem to work using scala shell. Thanks!! |
Exception went away after downloading flink-connector-kafka-base_2.11-1.4.1.jar to lib folder On Sat, Feb 24, 2018 at 6:36 PM, kant kodali <[hidden email]> wrote:
|
Hi, Good to see that you have it working! Yes, each of the Kafka version-specific connectors also have a dependency on the base Kafka connector module. Note that it is usually not recommended to put optional dependencies (such as the connectors) under the lib folder. To add additional dependencies when using the Scala shell, there is a “—addclasspath” option which allows you to specify paths to the dependency jars. Gordon
On 25 February 2018 at 12:22:28 PM, kant kodali ([hidden email]) wrote:
|
Hi Gordon, Thanks for the response!! How do I add multiple jars to the classpaths? Are they separated by a semicolon and still using one flag like "—addclasspath jar1; jar2" or specify the flag multiple times like "—addclasspath jar1 —addclasspath jar2" or specify just the directory "—addclasspath ./opt" so it adds all the jars in that directory! Thanks! On Sun, Feb 25, 2018 at 11:29 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
Judging from the code, you should separate different jars with a colon
":", i.e. "—addclasspath jar1:jar2" Nico On 26/02/18 10:36, kant kodali wrote: > Hi Gordon, > > Thanks for the response!! How do I add multiple jars to the classpaths? > Are they separated by a semicolon and still using one flag like > "—addclasspath jar1; jar2" or specify the flag multiple times like > "—addclasspath jar1 —addclasspath jar2" or specify just the directory > "—addclasspath ./opt" so it adds all the jars in that directory! > > Thanks! > > On Sun, Feb 25, 2018 at 11:29 PM, Tzu-Li (Gordon) Tai > <[hidden email] <mailto:[hidden email]>> wrote: > > Hi, > > Good to see that you have it working! Yes, each of the Kafka > version-specific connectors also have a dependency on the base Kafka > connector module. > > Note that it is usually not recommended to put optional dependencies > (such as the connectors) under the lib folder. > To add additional dependencies when using the Scala shell, there is > a “—addclasspath” option which allows you to specify paths to the > dependency jars. > > Cheers, > Gordon > > > On 25 February 2018 at 12:22:28 PM, kant kodali ([hidden email] > <mailto:[hidden email]>) wrote: > >> Exception went away after >> downloading flink-connector-kafka-base_2.11-1.4.1.jar to lib folder >> >> On Sat, Feb 24, 2018 at 6:36 PM, kant kodali <[hidden email] >> <mailto:[hidden email]>> wrote: >> >> Hi, >> >> I couldn't get flink and kafka working together. It looks like >> all examples I tried from web site fails with the following >> Exception. >> >> Caused by: java.lang.ClassNotFoundException: >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase >> >> >> *or when I do something like this like it is in the website* >> >> >> val stream = senv.addSource(new >> FlinkKafkaConsumer08[String]("join_test", new >> SimpleStringSchema(), properties)).print() >> >> *I get the following exception* >> >> <console>:73: error: overloaded method value addSource with >> alternatives: >> >> [T](function: >> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] >> => Unit)(implicit evidence$10: >> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] >> <and> >> >> [T](function: >> org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit >> evidence$9: >> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] >> >> cannot be applied to (org.apache.flink.streaming.co >> <http://org.apache.flink.streaming.co>nnectors.kafka.FlinkKafkaConsumer08[String]) >> >> val stream = senv.addSource(new >> FlinkKafkaConsumer08[String]("join_test", new >> SimpleStringSchema(), properties)).print() >> >> >> can anyone share a simple example of how to get Kafka Stream >> as a Table using scala shell? No need for any fancy schema >> just needs to print the value. I am using the latest version >> of flink 1.41 and my lib folder >> containers flink-connector-kafka-0.8_2.11-1.4.1.jar >> >> I wanted to use Kafka 0.9 but that didn't work so I thought >> let me just get something working first and downgraded to 0.8 >> but 0.8 examples on the website also don't seem to work using >> scala shell. >> >> Thanks!! >> >> >> >> > signature.asc (201 bytes) Download Attachment |
Thanks a lot! On Mon, Feb 26, 2018 at 9:19 AM, Nico Kruber <[hidden email]> wrote: Judging from the code, you should separate different jars with a colon |
Free forum by Nabble | Edit this page |