Flink Kafka example in Scala

classic Classic list List threaded Threaded
19 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Kafka example in Scala

Wendong
Hello,

Does anyone have a simple example of Flink Kafka written in Scala?

I've been struggling to make my test program working. Below is my program which has error in addSink (the part of KafkaWordCountProducer is copied from Spark sample program):

import java.util.HashMap
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}

import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
      .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
      .addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringSchema))
      .print

    println("Hi TestKafka")
    env.execute("Test Kafka")
  }
}

object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
         val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }
}

There is compilation error:

[error] .................TestKafka.scala:15: type mismatch;
[error]  found   : org.apache.flink.streaming.util.serialization.SimpleStringSchema
[error]  required: org.apache.flink.streaming.util.serialization.SerializationSchema[String,Array[Byte]]
[error]       .addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringSchema))

I changed SimpleStringSchema to SerializationSchema which still doesn't work.

I am trying to transit from Spark to Flink, but the samples in Flink are far less than those in Spark. It would be very helpful if there is an example of KafkaWordCount in Scala similar to that in Spark.

Thanks,

Wendong
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Anwar Rizal
Have you tried to replace

import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._

With

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.api.KafkaSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

I agree with you that the examples in the user guide might underestimate the importance of import. The import statements should be in the example.

Anwar.

On Thu, Jul 16, 2015 at 2:45 AM, Wendong <[hidden email]> wrote:
Hello,

Does anyone have a simple example of Flink Kafka written in Scala?

I've been struggling to make my test program working. Below is my program
which has error in addSink (the part of KafkaWordCountProducer is copied
from Spark sample program):

import java.util.HashMap
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer,
ProducerRecord}

import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
      .addSource(new KafkaSource[String]("localhost:2181", "test", new
SimpleStringSchema))
      .addSink(new KafkaSink[String]("localhost:2181", "test", new
SimpleStringSchema))
      .print

    println("Hi TestKafka")
    env.execute("Test Kafka")
  }
}

object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList>
<topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
         val str = (1 to wordsPerMessage.toInt).map(x =>
scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }
}

There is compilation error:

[error] .................TestKafka.scala:15: type mismatch;
[error]  found   :
org.apache.flink.streaming.util.serialization.SimpleStringSchema
[error]  required:
org.apache.flink.streaming.util.serialization.SerializationSchema[String,Array[Byte]]
[error]       .addSink(new KafkaSink[String]("localhost:2181", "test", new
SimpleStringSchema))

I changed SimpleStringSchema to SerializationSchema which still doesn't
work.

I am trying to transit from Spark to Flink, but the samples in Flink are far
less than those in Spark. It would be very helpful if there is an example of
KafkaWordCount in Scala similar to that in Spark.

Thanks,

Wendong



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Wendong
I tried, but got error:

[error] TestKafka.scala:11: object scala is not a member of package org.apache.flink.streaming.api
[error] import org.apache.flink.streaming.api.scala._

So I switched back to my original import statements.

Now I changed SimpleStringSchema to JavaDefaultStringSchema in addSink(new KafkaSink(...)), then compilation error was gone.

The problem is that there is runtime error:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:315)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: java.lang.RuntimeException: Data stream sinks cannot be copied
        at org.apache.flink.streaming.api.datastream.DataStreamSink.copy(DataStreamSink.java:43)
        at org.apache.flink.streaming.api.datastream.DataStreamSink.copy(DataStreamSink.java:30)
        at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1341)
        at org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:1029)
...........

Googled the error message but didn't find useful information.

Anyone can shed some light?

Thanks!

Wendong
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Anwar Rizal
The compilation error is because you don't define dependencies to flink streaming scala.
In SBT , you define something like:
libraryDependencies += "org.apache.flink" % "flink-streaming-scala" % "0.9.0"


On Thu, Jul 16, 2015 at 6:36 AM, Wendong <[hidden email]> wrote:
I tried, but got error:

[error] TestKafka.scala:11: object scala is not a member of package
org.apache.flink.streaming.api
[error] import org.apache.flink.streaming.api.scala._

So I switched back to my original import statements.

Now I changed SimpleStringSchema to JavaDefaultStringSchema in addSink(new
KafkaSink(...)), then compilation error was gone.

The problem is that there is runtime error:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:315)
        at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
        at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: java.lang.RuntimeException:* Data stream sinks cannot be copied*
        at
org.apache.flink.streaming.api.datastream.DataStreamSink.copy(DataStreamSink.java:43)
        at
org.apache.flink.streaming.api.datastream.DataStreamSink.copy(DataStreamSink.java:30)
        at
org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1341)
        at
org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:1029)
...........

Googled the error message but didn't find useful information.

Anyone can shed some light?

Thanks!

Wendong




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2071.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Aljoscha Krettek
Hi,
your first example doesn't work because the SimpleStringSchema does not work for sinks. You can use this modified serialization schema: https://gist.github.com/aljoscha/e131fa8581f093915582. This works for both source and sink (I think the current SimpleStringSchema is not correct and should be changed in the next release.)

Cheers,
Aljoscha

On Thu, 16 Jul 2015 at 08:37 Anwar Rizal <[hidden email]> wrote:
The compilation error is because you don't define dependencies to flink streaming scala.
In SBT , you define something like:
libraryDependencies += "org.apache.flink" % "flink-streaming-scala" % "0.9.0"


On Thu, Jul 16, 2015 at 6:36 AM, Wendong <[hidden email]> wrote:
I tried, but got error:

[error] TestKafka.scala:11: object scala is not a member of package
org.apache.flink.streaming.api
[error] import org.apache.flink.streaming.api.scala._

So I switched back to my original import statements.

Now I changed SimpleStringSchema to JavaDefaultStringSchema in addSink(new
KafkaSink(...)), then compilation error was gone.

The problem is that there is runtime error:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:315)
        at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
        at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: java.lang.RuntimeException:* Data stream sinks cannot be copied*
        at
org.apache.flink.streaming.api.datastream.DataStreamSink.copy(DataStreamSink.java:43)
        at
org.apache.flink.streaming.api.datastream.DataStreamSink.copy(DataStreamSink.java:30)
        at
org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1341)
        at
org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:1029)
...........

Googled the error message but didn't find useful information.

Anyone can shed some light?

Thanks!

Wendong




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2071.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Wendong
Thanks! I tried your updated MySimpleStringSchema and it works for both source and sink.

However, my problem is the runtime error "Data stream sinks cannot be copied" as listed in previous post. I hope someone ran into the problem before and can give a hint.

Wendong
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Gyula Fóra
Hey,
The reason you are getting that error is because you are calling print after adding a sink, which is an invalid operation.
Remove either addSink or print :)
Cheers,
Gyula

On Thu, Jul 16, 2015 at 7:37 PM Wendong <[hidden email]> wrote:
Thanks! I tried your updated MySimpleStringSchema and it works for both
source and sink.

However, my problem is the runtime error "Data stream sinks cannot be
copied" as listed in previous post. I hope someone ran into the problem
before and can give a hint.

Wendong



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2109.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Wendong
Hi Gyula,

Cool. I removed .print and the error was gone.

However, env.execute failed with errors:

.........
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504)
.......
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185)
.......
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.api.KafkaSink
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)


In the following code:

    val stream = env
      .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
      .addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringSchema))

Anything wrong? I already did  "import org.apache.flink.streaming.connectors.kafka.api._". Class SimpleStringSchema was modified (see previous post).

Thanks,

Wendong
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Aljoscha Krettek

Hi, this looks like the flink-connector-kafka jar is not available where the job is running? Did you put it in the library folder of flink on all the machines or did you submit it with the job?


On Thu, Jul 16, 2015, 21:05 Wendong <[hidden email]> wrote:
Hi Gyula,

Cool. I removed .print and the error was gone.

However, env.execute failed with errors:

.........
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable
failed
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504)
.......
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot instantiate user function.
        at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185)
.......
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.api.KafkaSink
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)


In the following code:

    val stream = env
      .addSource(new KafkaSource[String]("localhost:2181", "test", new
SimpleStringSchema))
      .addSink(new KafkaSink[String]("localhost:2181", "test", new
SimpleStringSchema))

Anything wrong? I already did  "import
org.apache.flink.streaming.connectors.kafka.api._". Class SimpleStringSchema
was modified (see previous post).

Thanks,

Wendong



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2112.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Till Rohrmann
These two links [1, 2] might help to get your job running. The first link describes how to set up a job using Flink's machine learning library, but it works also for the flink-connector-kafka library.

Cheers,
Till


On Fri, Jul 17, 2015 at 8:42 AM, Aljoscha Krettek <[hidden email]> wrote:

Hi, this looks like the flink-connector-kafka jar is not available where the job is running? Did you put it in the library folder of flink on all the machines or did you submit it with the job?


On Thu, Jul 16, 2015, 21:05 Wendong <[hidden email]> wrote:
Hi Gyula,

Cool. I removed .print and the error was gone.

However, env.execute failed with errors:

.........
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable
failed
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504)
.......
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot instantiate user function.
        at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185)
.......
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.api.KafkaSink
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)


In the following code:

    val stream = env
      .addSource(new KafkaSource[String]("localhost:2181", "test", new
SimpleStringSchema))
      .addSink(new KafkaSink[String]("localhost:2181", "test", new
SimpleStringSchema))

Anything wrong? I already did  "import
org.apache.flink.streaming.connectors.kafka.api._". Class SimpleStringSchema
was modified (see previous post).

Thanks,

Wendong



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2112.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Wendong
In reply to this post by Aljoscha Krettek
Hi Aljoscha,

Yes, the flink-connector-kafka jar file is under Flink lib directory:

  flink-0.9.0/lib/flink-connector-kafka-0.9.0.jar

and it shows KafkaSink class exists:
 
$ jar tf lib/flink-connector-kafka-0.9.0.jar | grep KafkaSink
org/apache/flink/streaming/connectors/kafka/api/KafkaSink.class

I cannot think of any reason why KafkaSink not found:

.........
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504)
.......
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185)
.......
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.api.KafkaSink 
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

Any clue?

Thanks,

Wendong
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Wendong
In reply to this post by Till Rohrmann
Hi Till,

Thanks for the information. I'm using sbt and I have the following line in build.sbt:

libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")

Also, I copied flink-connector-kafka-0.9.0.jar under <flink_root_dir>/lib/, but there is still ClassNotFoundException for KafkaSink.

I appreciate it if you have any suggestion.

Wendong
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Till Rohrmann
Hi Wendong,

why do you exclude the kafka dependency from the `flink-connector-kafka`? Do you want to use your own kafka version?

I'd recommend you to build a fat jar instead of trying to put the right dependencies in `/lib`. Here [1] you can see how to build a fat jar with sbt.

Cheers,
Till


On Sat, Jul 18, 2015 at 12:40 AM, Wendong <[hidden email]> wrote:
Hi Till,

Thanks for the information. I'm using sbt and I have the following line in
build.sbt:

libraryDependencies += "org.apache.flink" % "flink-connector-kafka" %
"0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")

Also, I copied flink-connector-kafka-0.9.0.jar under <flink_root_dir>/lib/,
but there is still ClassNotFoundException for KafkaSink.

I appreciate it if you have any suggestion.

Wendong



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2144.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Anwar Rizal
I do the same trick as Wendong to avoid compilation error of sbt (excluding kafka_$(scala.binary.version) )

I still don't manage to make sbt pass scala.binary.version to maven.

Anwar.

On Mon, Jul 20, 2015 at 9:42 AM, Till Rohrmann <[hidden email]> wrote:
Hi Wendong,

why do you exclude the kafka dependency from the `flink-connector-kafka`? Do you want to use your own kafka version?

I'd recommend you to build a fat jar instead of trying to put the right dependencies in `/lib`. Here [1] you can see how to build a fat jar with sbt.

Cheers,
Till


On Sat, Jul 18, 2015 at 12:40 AM, Wendong <[hidden email]> wrote:
Hi Till,

Thanks for the information. I'm using sbt and I have the following line in
build.sbt:

libraryDependencies += "org.apache.flink" % "flink-connector-kafka" %
"0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")

Also, I copied flink-connector-kafka-0.9.0.jar under <flink_root_dir>/lib/,
but there is still ClassNotFoundException for KafkaSink.

I appreciate it if you have any suggestion.

Wendong



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2144.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Till Rohrmann
Why not trying maven instead?

On Mon, Jul 20, 2015 at 10:23 AM, Anwar Rizal <[hidden email]> wrote:
I do the same trick as Wendong to avoid compilation error of sbt (excluding kafka_$(scala.binary.version) )

I still don't manage to make sbt pass scala.binary.version to maven.

Anwar.

On Mon, Jul 20, 2015 at 9:42 AM, Till Rohrmann <[hidden email]> wrote:
Hi Wendong,

why do you exclude the kafka dependency from the `flink-connector-kafka`? Do you want to use your own kafka version?

I'd recommend you to build a fat jar instead of trying to put the right dependencies in `/lib`. Here [1] you can see how to build a fat jar with sbt.

Cheers,
Till


On Sat, Jul 18, 2015 at 12:40 AM, Wendong <[hidden email]> wrote:
Hi Till,

Thanks for the information. I'm using sbt and I have the following line in
build.sbt:

libraryDependencies += "org.apache.flink" % "flink-connector-kafka" %
"0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")

Also, I copied flink-connector-kafka-0.9.0.jar under <flink_root_dir>/lib/,
but there is still ClassNotFoundException for KafkaSink.

I appreciate it if you have any suggestion.

Wendong



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2144.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Anwar Rizal
Coz  I don't like it :-)

No, seriously, sure, I can do it with maven. It worked indeed with maven.
But the rest of our ecosystem uses sbt. That's why.

-Anwar



On Mon, Jul 20, 2015 at 10:28 AM, Till Rohrmann <[hidden email]> wrote:
Why not trying maven instead?

On Mon, Jul 20, 2015 at 10:23 AM, Anwar Rizal <[hidden email]> wrote:
I do the same trick as Wendong to avoid compilation error of sbt (excluding kafka_$(scala.binary.version) )

I still don't manage to make sbt pass scala.binary.version to maven.

Anwar.

On Mon, Jul 20, 2015 at 9:42 AM, Till Rohrmann <[hidden email]> wrote:
Hi Wendong,

why do you exclude the kafka dependency from the `flink-connector-kafka`? Do you want to use your own kafka version?

I'd recommend you to build a fat jar instead of trying to put the right dependencies in `/lib`. Here [1] you can see how to build a fat jar with sbt.

Cheers,
Till


On Sat, Jul 18, 2015 at 12:40 AM, Wendong <[hidden email]> wrote:
Hi Till,

Thanks for the information. I'm using sbt and I have the following line in
build.sbt:

libraryDependencies += "org.apache.flink" % "flink-connector-kafka" %
"0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")

Also, I copied flink-connector-kafka-0.9.0.jar under <flink_root_dir>/lib/,
but there is still ClassNotFoundException for KafkaSink.

I appreciate it if you have any suggestion.

Wendong



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2144.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Stephan Ewen
For other issues (hadoop versions), we used a shell script that did a search and replace on the variables.

Maybe you can do the same trick here...

On Mon, Jul 20, 2015 at 10:37 AM, Anwar Rizal <[hidden email]> wrote:
Coz  I don't like it :-)

No, seriously, sure, I can do it with maven. It worked indeed with maven.
But the rest of our ecosystem uses sbt. That's why.

-Anwar



On Mon, Jul 20, 2015 at 10:28 AM, Till Rohrmann <[hidden email]> wrote:
Why not trying maven instead?

On Mon, Jul 20, 2015 at 10:23 AM, Anwar Rizal <[hidden email]> wrote:
I do the same trick as Wendong to avoid compilation error of sbt (excluding kafka_$(scala.binary.version) )

I still don't manage to make sbt pass scala.binary.version to maven.

Anwar.

On Mon, Jul 20, 2015 at 9:42 AM, Till Rohrmann <[hidden email]> wrote:
Hi Wendong,

why do you exclude the kafka dependency from the `flink-connector-kafka`? Do you want to use your own kafka version?

I'd recommend you to build a fat jar instead of trying to put the right dependencies in `/lib`. Here [1] you can see how to build a fat jar with sbt.

Cheers,
Till


On Sat, Jul 18, 2015 at 12:40 AM, Wendong <[hidden email]> wrote:
Hi Till,

Thanks for the information. I'm using sbt and I have the following line in
build.sbt:

libraryDependencies += "org.apache.flink" % "flink-connector-kafka" %
"0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")

Also, I copied flink-connector-kafka-0.9.0.jar under <flink_root_dir>/lib/,
but there is still ClassNotFoundException for KafkaSink.

I appreciate it if you have any suggestion.

Wendong



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2144.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.





Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Wendong
In reply to this post by Till Rohrmann
Hi Till,

Thanks for your suggestion! I did a fat jar and the runtime error of ClassNotFoundException was finally gone. I wish I had tried fat jar earlier and it would have saved me 4 days.

Wendong
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka example in Scala

Till Rohrmann
Glad to hear that it finally worked :-)

On Tue, Jul 21, 2015 at 2:21 AM, Wendong <[hidden email]> wrote:
Hi Till,

Thanks for your suggestion! I did a fat jar and the runtime error of
ClassNotFoundException was finally gone. I wish I had tried fat jar earlier
and it would have saved me 4 days.

Wendong



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2198.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.