Hi, I would like to « unit test » a job flink with Kafka as source (and Sink). I am trying to use the library scalatest-embedded-kafka to simulate a Kafka for my test. For example, I would like to get data (string stream) from Kafka, convert it intro uppercase and put it into another topic. Now, I am just trying to use Flink’s kafka consumer to read into a topic (with embedded kafka). Here is the code for example : ```scala import java.util.Properties import org.apache.flink.streaming.api.scala._ import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.scalatest.{Matchers, WordSpec} import scala.util.Random object SimpleFlinkKafkaTest { SimpleFlinkKafkaTest val kafkaPort = 9092 val zooKeeperPort = 2181 val groupId = Random.nextInt(1000000).toString val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("zookeeper.connect", "localhost:2181") props.put("auto.offset.reset", "earliest") props.put("group.id", groupId) props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val propsMap = Map( "bootstrap.servers" -> "localhost:9092", "zookeeper.connect" -> "localhost:2181", "auto.offset.reset" -> "earliest", "group.id" -> groupId, "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) val inputString = "mystring" val expectedString = "MYSTRING" } class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka { "runs with embedded kafka" should { "work" in { implicit val config = EmbeddedKafkaConfig( kafkaPort = SimpleFlinkKafkaTest.kafkaPort, zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort, customConsumerProperties = SimpleFlinkKafkaTest.propsMap ) withRunningKafka { publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest.inputString) val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val kafkaConsumer = new FlinkKafkaConsumer011( "input-topic", new SimpleStringSchema, SimpleFlinkKafkaTest.props ) val inputStream = env.addSource(kafkaConsumer) val outputStream = inputStream.map { msg => msg.toUpperCase } outputStream.writeAsText("test.csv", WriteMode.OVERWRITE) env.execute() consumeFirstStringMessageFrom("output-topic") shouldEqual SimpleFlinkKafkaTest.expectedString } } } } ``` The flink process si running but nothing happen. I try ot write into a text file to see any output but there is nothing into the file. Any idea ? Does anybody use this library to test a Flink Job using Kafka ? Thanks in advance, Thomas |
Hi, withRunningKafka launch a kafka broker. This is one of the advantage of this library. I test to consume / produce messages with kafka command line, and it seems alright. Thanks De : Ted Yu [mailto:[hidden email]]
Looking at your code, Kafka broker was not started. Was there running broker on localhost ? Cheers On Thu, Apr 19, 2018 at 6:23 AM, Chauvet, Thomas <[hidden email]> wrote:
|
Pardon - I missed the implicit config (which is used by withRunningKafka). Without your manual message production, was there any indication in broker log that it received message(s) ? Thanks On Thu, Apr 19, 2018 at 6:31 AM, Chauvet, Thomas <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |