Hello,
Does anyone have a simple example of Flink Kafka written in Scala? I've been struggling to make my test program working. Below is my program which has error in addSink (the part of KafkaWordCountProducer is copied from Spark sample program): import java.util.HashMap import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} import org.apache.flink.streaming.api.environment._ import org.apache.flink.streaming.connectors.kafka import org.apache.flink.streaming.connectors.kafka.api._ import org.apache.flink.streaming.util.serialization._ object TestKafka { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)) .addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringSchema)) println("Hi TestKafka") env.execute("Test Kafka") } } object KafkaWordCountProducer { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + "<messagesPerSec> <wordsPerMessage>") System.exit(1) } val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeeper connection properties val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) // Send some messages while(true) { (1 to messagesPerSec.toInt).foreach { messageNum => val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) .mkString(" ") val message = new ProducerRecord[String, String](topic, null, str) producer.send(message) } Thread.sleep(1000) } } } There is compilation error: [error] .................TestKafka.scala:15: type mismatch; [error] found : org.apache.flink.streaming.util.serialization.SimpleStringSchema [error] required: org.apache.flink.streaming.util.serialization.SerializationSchema[String,Array[Byte]] [error] .addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringSchema)) I changed SimpleStringSchema to SerializationSchema which still doesn't work. I am trying to transit from Spark to Flink, but the samples in Flink are far less than those in Spark. It would be very helpful if there is an example of KafkaWordCount in Scala similar to that in Spark. Thanks, Wendong |
Have you tried to replace import org.apache.flink.streaming.api.environment._ import org.apache.flink.streaming.connectors.kafka import org.apache.flink.streaming.connectors.kafka.api._ import org.apache.flink.streaming.util.serialization._ With import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.api.KafkaSource import org.apache.flink.streaming.util.serialization.SimpleStringSchema I agree with you that the examples in the user guide might underestimate the importance of import. The import statements should be in the example. Anwar. On Thu, Jul 16, 2015 at 2:45 AM, Wendong <[hidden email]> wrote: Hello, |
I tried, but got error:
[error] TestKafka.scala:11: object scala is not a member of package org.apache.flink.streaming.api [error] import org.apache.flink.streaming.api.scala._ So I switched back to my original import statements. Now I changed SimpleStringSchema to JavaDefaultStringSchema in addSink(new KafkaSink(...)), then compilation error was gone. The problem is that there is runtime error: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) at org.apache.flink.client.program.Client.run(Client.java:315) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922) Caused by: java.lang.RuntimeException: Data stream sinks cannot be copied at org.apache.flink.streaming.api.datastream.DataStreamSink.copy(DataStreamSink.java:43) at org.apache.flink.streaming.api.datastream.DataStreamSink.copy(DataStreamSink.java:30) at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1341) at org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:1029) ........... Googled the error message but didn't find useful information. Anyone can shed some light? Thanks! Wendong |
The compilation error is because you don't define dependencies to flink streaming scala. In SBT , you define something like: libraryDependencies += "org.apache.flink" % "flink-streaming-scala" % "0.9.0" On Thu, Jul 16, 2015 at 6:36 AM, Wendong <[hidden email]> wrote: I tried, but got error: |
Hi,
your first example doesn't work because the SimpleStringSchema does not work for sinks. You can use this modified serialization schema: https://gist.github.com/aljoscha/e131fa8581f093915582. This works for both source and sink (I think the current SimpleStringSchema is not correct and should be changed in the next release.) Cheers, Aljoscha On Thu, 16 Jul 2015 at 08:37 Anwar Rizal <[hidden email]> wrote:
|
Thanks! I tried your updated MySimpleStringSchema and it works for both source and sink.
However, my problem is the runtime error "Data stream sinks cannot be copied" as listed in previous post. I hope someone ran into the problem before and can give a hint. Wendong |
Hey,
The reason you are getting that error is because you are calling print after adding a sink, which is an invalid operation. Remove either addSink or print :) Cheers, Gyula On Thu, Jul 16, 2015 at 7:37 PM Wendong <[hidden email]> wrote: Thanks! I tried your updated MySimpleStringSchema and it works for both |
Hi Gyula,
Cool. I removed .print and the error was gone. However, env.execute failed with errors: ......... Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) ....... Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185) ....... Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.api.KafkaSink at java.net.URLClassLoader$1.run(URLClassLoader.java:366) In the following code: val stream = env .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)) .addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringSchema)) Anything wrong? I already did "import org.apache.flink.streaming.connectors.kafka.api._". Class SimpleStringSchema was modified (see previous post). Thanks, Wendong |
Hi, this looks like the flink-connector-kafka jar is not available where the job is running? Did you put it in the library folder of flink on all the machines or did you submit it with the job? On Thu, Jul 16, 2015, 21:05 Wendong <[hidden email]> wrote: Hi Gyula, |
These two links [1, 2] might help to get your job running. The first link describes how to set up a job using Flink's machine learning library, but it works also for the flink-connector-kafka library. Cheers, Till On Fri, Jul 17, 2015 at 8:42 AM, Aljoscha Krettek <[hidden email]> wrote:
|
In reply to this post by Aljoscha Krettek
Hi Aljoscha,
Yes, the flink-connector-kafka jar file is under Flink lib directory: flink-0.9.0/lib/flink-connector-kafka-0.9.0.jar and it shows KafkaSink class exists: $ jar tf lib/flink-connector-kafka-0.9.0.jar | grep KafkaSink org/apache/flink/streaming/connectors/kafka/api/KafkaSink.class I cannot think of any reason why KafkaSink not found: ......... Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) ....... Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185) ....... Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.api.KafkaSink at java.net.URLClassLoader$1.run(URLClassLoader.java:366) Any clue? Thanks, Wendong |
In reply to this post by Till Rohrmann
Hi Till,
Thanks for the information. I'm using sbt and I have the following line in build.sbt: libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}") Also, I copied flink-connector-kafka-0.9.0.jar under <flink_root_dir>/lib/, but there is still ClassNotFoundException for KafkaSink. I appreciate it if you have any suggestion. Wendong |
Hi Wendong, why do you exclude the kafka dependency from the `flink-connector-kafka`? Do you want to use your own kafka version? I'd recommend you to build a fat jar instead of trying to put the right dependencies in `/lib`. Here [1] you can see how to build a fat jar with sbt. Cheers, Till On Sat, Jul 18, 2015 at 12:40 AM, Wendong <[hidden email]> wrote: Hi Till, |
I do the same trick as Wendong to avoid compilation error of sbt (excluding kafka_$(scala.binary.version) ) I still don't manage to make sbt pass scala.binary.version to maven. Anwar. On Mon, Jul 20, 2015 at 9:42 AM, Till Rohrmann <[hidden email]> wrote:
|
Why not trying maven instead? On Mon, Jul 20, 2015 at 10:23 AM, Anwar Rizal <[hidden email]> wrote:
|
Coz I don't like it :-) No, seriously, sure, I can do it with maven. It worked indeed with maven. But the rest of our ecosystem uses sbt. That's why. -Anwar On Mon, Jul 20, 2015 at 10:28 AM, Till Rohrmann <[hidden email]> wrote:
|
For other issues (hadoop versions), we used a shell script that did a search and replace on the variables. Maybe you can do the same trick here... On Mon, Jul 20, 2015 at 10:37 AM, Anwar Rizal <[hidden email]> wrote:
|
In reply to this post by Till Rohrmann
Hi Till,
Thanks for your suggestion! I did a fat jar and the runtime error of ClassNotFoundException was finally gone. I wish I had tried fat jar earlier and it would have saved me 4 days. Wendong |
Glad to hear that it finally worked :-) On Tue, Jul 21, 2015 at 2:21 AM, Wendong <[hidden email]> wrote: Hi Till, |
Free forum by Nabble | Edit this page |