Hi, I have a streaming topic called "md" that displays test market data. I have written a simple program to stream data in via kafka into flinl. Flink version 1.5 Kafka version 2.12 This is the sample program in scala that compiles ok in start-scala-shell.sh import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.util.serialization.DeserializationSchema //import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.util.serialization.SimpleStringSchema object Main { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "sampleScala") val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .print() env.execute("Flink Kafka Example") } } warning: there was one deprecation warning; re-run with -deprecation for details defined object Main But I do not see any streaming output. A naïve question. How do I execute the above compiled object in this shell? Thanks Dr Mich Talebzadeh
Please try new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties).setStartFromEarliest() and try again. Cheers Minglei.
Hi Mich, How did you setup your local Kafka cluster, did you produce any message to it? Seems like you are using a standard local Kafka cluster setup for testing: "bootstrap.servers", "localhost:9092" "zookeeper.connect", "localhost:2181" so probably you need to manually produce some data, probably using kafka-console-producer [1] Another thing is since you are executing it in the scala shell, it might be easier for you to do val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .writeAsText("some_file_path") so that the produced result won't get buried in a huge list of console output messages. -- Rong On Sat, Jun 30, 2018 at 8:06 AM zhangminglei <[hidden email]> wrote:
