Flink / Kafka unit testing with scalatest-embedded-kafka

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink / Kafka unit testing with scalatest-embedded-kafka

Chauvet, Thomas

Hi,

 

I would like to « unit test » a job flink with Kafka as source (and Sink). I am trying to use the library scalatest-embedded-kafka to simulate a Kafka for my test.

 

For example, I would like to get data (string stream) from Kafka, convert it intro uppercase and put it into another topic.

 

Now, I am just trying to use Flink’s kafka consumer to read into a topic (with embedded kafka).

 

Here is the code for example :

 

```scala

 

import java.util.Properties

 

import org.apache.flink.streaming.api.scala._

import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}

import org.apache.flink.api.common.serialization.SimpleStringSchema

import org.apache.flink.core.fs.FileSystem.WriteMode

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import org.scalatest.{Matchers, WordSpec}

import scala.util.Random

 

object SimpleFlinkKafkaTest {

  SimpleFlinkKafkaTest

  val kafkaPort = 9092

  val zooKeeperPort = 2181

 

  val groupId = Random.nextInt(1000000).toString

  val props = new Properties()

  props.put("bootstrap.servers", "localhost:9092")

  props.put("zookeeper.connect", "localhost:2181")

  props.put("auto.offset.reset", "earliest")

  props.put("group.id", groupId)

  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

 

  val propsMap = Map(

   "bootstrap.servers" -> "localhost:9092",

    "zookeeper.connect" -> "localhost:2181",

    "auto.offset.reset" -> "earliest",

    "group.id" -> groupId,

    "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",

    "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"

  )

 

  val inputString = "mystring"

  val expectedString = "MYSTRING"

 

}

 

class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {

 

  "runs with embedded kafka" should {

 

    "work" in {

 

      implicit val config = EmbeddedKafkaConfig(

        kafkaPort = SimpleFlinkKafkaTest.kafkaPort,

        zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort,

        customConsumerProperties = SimpleFlinkKafkaTest.propsMap

      )

 

      withRunningKafka {

 

        publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest.inputString)

 

        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.setParallelism(1)

 

        val kafkaConsumer = new FlinkKafkaConsumer011(

          "input-topic",

          new SimpleStringSchema,

          SimpleFlinkKafkaTest.props

        )

 

        val inputStream = env.addSource(kafkaConsumer)

 

        val outputStream = inputStream.map { msg =>

          msg.toUpperCase

        }

 

        outputStream.writeAsText("test.csv", WriteMode.OVERWRITE)

 

        env.execute()

 

        consumeFirstStringMessageFrom("output-topic") shouldEqual SimpleFlinkKafkaTest.expectedString

 

      }

    }

  }

}

```

 

The flink process si running but nothing happen. I try ot write into a text file to see any output but there is nothing into the file.

 

Any idea ? Does anybody use this library to test a Flink Job using Kafka ?

 

Thanks in advance,

 

Thomas

Reply | Threaded
Open this post in threaded view
|

RE: Flink / Kafka unit testing with scalatest-embedded-kafka

Chauvet, Thomas

Hi,

 

withRunningKafka launch a kafka broker. This is one of the advantage of this library.

 

I test to consume / produce messages with kafka command line, and it seems alright.

 

Thanks

 

De : Ted Yu [mailto:[hidden email]]
Envoyé : jeudi 19 avril 2018 15:28
À : Chauvet, Thomas <[hidden email]>
Objet : Re: Flink / Kafka unit testing with scalatest-embedded-kafka

 

Looking at your code, Kafka broker was not started.

 

Was there running broker on localhost ?

 

Cheers

 

On Thu, Apr 19, 2018 at 6:23 AM, Chauvet, Thomas <[hidden email]> wrote:

Hi,

 

I would like to « unit test » a job flink with Kafka as source (and Sink). I am trying to use the library scalatest-embedded-kafka to simulate a Kafka for my test.

 

For example, I would like to get data (string stream) from Kafka, convert it intro uppercase and put it into another topic.

 

Now, I am just trying to use Flink’s kafka consumer to read into a topic (with embedded kafka).

 

Here is the code for example :

 

```scala

 

import java.util.Properties

 

import org.apache.flink.streaming.api.scala._

import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}

import org.apache.flink.api.common.serialization.SimpleStringSchema

import org.apache.flink.core.fs.FileSystem.WriteMode

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import org.scalatest.{Matchers, WordSpec}

import scala.util.Random

 

object SimpleFlinkKafkaTest {

  SimpleFlinkKafkaTest

  val kafkaPort = 9092

  val zooKeeperPort = 2181

 

  val groupId = Random.nextInt(1000000).toString

  val props = new Properties()

  props.put("bootstrap.servers", "localhost:9092")

  props.put("zookeeper.connect", "localhost:2181")

  props.put("auto.offset.reset", "earliest")

  props.put("group.id", groupId)

  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

 

  val propsMap = Map(

   "bootstrap.servers" -> "localhost:9092",

    "zookeeper.connect" -> "localhost:2181",

    "auto.offset.reset" -> "earliest",

    "group.id" -> groupId,

    "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",

    "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"

  )

 

  val inputString = "mystring"

  val expectedString = "MYSTRING"

 

}

 

class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {

 

  "runs with embedded kafka" should {

 

    "work" in {

 

      implicit val config = EmbeddedKafkaConfig(

        kafkaPort = SimpleFlinkKafkaTest.kafkaPort,

        zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort,

        customConsumerProperties = SimpleFlinkKafkaTest.propsMap

      )

 

      withRunningKafka {

 

        publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest.inputString)

 

        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.setParallelism(1)

 

        val kafkaConsumer = new FlinkKafkaConsumer011(

          "input-topic",

          new SimpleStringSchema,

          SimpleFlinkKafkaTest.props

        )

 

        val inputStream = env.addSource(kafkaConsumer)

 

        val outputStream = inputStream.map { msg =>

          msg.toUpperCase

        }

 

        outputStream.writeAsText("test.csv", WriteMode.OVERWRITE)

 

        env.execute()

 

        consumeFirstStringMessageFrom("output-topic") shouldEqual SimpleFlinkKafkaTest.expectedString

 

      }

    }

  }

}

```

 

The flink process si running but nothing happen. I try ot write into a text file to see any output but there is nothing into the file.

 

Any idea ? Does anybody use this library to test a Flink Job using Kafka ?

 

Thanks in advance,

 

Thomas

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink / Kafka unit testing with scalatest-embedded-kafka

Ted Yu
Pardon - I missed the implicit config (which is used by withRunningKafka).

Without your manual message production, was there any indication in broker log that it received message(s) ?

Thanks

On Thu, Apr 19, 2018 at 6:31 AM, Chauvet, Thomas <[hidden email]> wrote:

Hi,

 

withRunningKafka launch a kafka broker. This is one of the advantage of this library.

 

I test to consume / produce messages with kafka command line, and it seems alright.

 

Thanks

 

De : Ted Yu [mailto:[hidden email]]
Envoyé : jeudi 19 avril 2018 15:28
À : Chauvet, Thomas <[hidden email]>
Objet : Re: Flink / Kafka unit testing with scalatest-embedded-kafka

 

Looking at your code, Kafka broker was not started.

 

Was there running broker on localhost ?

 

Cheers

 

On Thu, Apr 19, 2018 at 6:23 AM, Chauvet, Thomas <[hidden email]> wrote:

Hi,

 

I would like to « unit test » a job flink with Kafka as source (and Sink). I am trying to use the library scalatest-embedded-kafka to simulate a Kafka for my test.

 

For example, I would like to get data (string stream) from Kafka, convert it intro uppercase and put it into another topic.

 

Now, I am just trying to use Flink’s kafka consumer to read into a topic (with embedded kafka).

 

Here is the code for example :

 

```scala

 

import java.util.Properties

 

import org.apache.flink.streaming.api.scala._

import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}

import org.apache.flink.api.common.serialization.SimpleStringSchema

import org.apache.flink.core.fs.FileSystem.WriteMode

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import org.scalatest.{Matchers, WordSpec}

import scala.util.Random

 

object SimpleFlinkKafkaTest {

  SimpleFlinkKafkaTest

  val kafkaPort = 9092

  val zooKeeperPort = 2181

 

  val groupId = Random.nextInt(1000000).toString

  val props = new Properties()

  props.put("bootstrap.servers", "localhost:9092")

  props.put("zookeeper.connect", "localhost:2181")

  props.put("auto.offset.reset", "earliest")

  props.put("group.id", groupId)

  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

 

  val propsMap = Map(

   "bootstrap.servers" -> "localhost:9092",

    "zookeeper.connect" -> "localhost:2181",

    "auto.offset.reset" -> "earliest",

    "group.id" -> groupId,

    "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",

    "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"

  )

 

  val inputString = "mystring"

  val expectedString = "MYSTRING"

 

}

 

class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {

 

  "runs with embedded kafka" should {

 

    "work" in {

 

      implicit val config = EmbeddedKafkaConfig(

        kafkaPort = SimpleFlinkKafkaTest.kafkaPort,

        zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort,

        customConsumerProperties = SimpleFlinkKafkaTest.propsMap

      )

 

      withRunningKafka {

 

        publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest.inputString)

 

        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.setParallelism(1)

 

        val kafkaConsumer = new FlinkKafkaConsumer011(

          "input-topic",

          new SimpleStringSchema,

          SimpleFlinkKafkaTest.props

        )

 

        val inputStream = env.addSource(kafkaConsumer)

 

        val outputStream = inputStream.map { msg =>

          msg.toUpperCase

        }

 

        outputStream.writeAsText("test.csv", WriteMode.OVERWRITE)

 

        env.execute()

 

        consumeFirstStringMessageFrom("output-topic") shouldEqual SimpleFlinkKafkaTest.expectedString

 

      }

    }

  }

}

```

 

The flink process si running but nothing happen. I try ot write into a text file to see any output but there is nothing into the file.

 

Any idea ? Does anybody use this library to test a Flink Job using Kafka ?

 

Thanks in advance,

 

Thomas