Hi, I have a streaming topic called "md" that displays test market data. I have written a simple program to stream data in via kafka into flinl. Flink version 1.5 Kafka version 2.12 This is the sample program in scala that compiles ok in start-scala-shell.sh import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.util.serialization.DeserializationSchema //import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.util.serialization.SimpleStringSchema object Main { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "sampleScala") val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .print() env.execute("Flink Kafka Example") } } warning: there was one deprecation warning; re-run with -deprecation for details defined object Main But I do not see any streaming output. A naïve question. How do I execute the above compiled object in this shell? Thanks Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
|
Please try new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties).setStartFromEarliest() and try again. Cheers Minglei.
|
Hi Mich, How did you setup your local Kafka cluster, did you produce any message to it? Seems like you are using a standard local Kafka cluster setup for testing: "bootstrap.servers", "localhost:9092" "zookeeper.connect", "localhost:2181" so probably you need to manually produce some data, probably using kafka-console-producer [1] Another thing is since you are executing it in the scala shell, it might be easier for you to do val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .writeAsText("some_file_path") so that the produced result won't get buried in a huge list of console output messages. -- Rong On Sat, Jun 30, 2018 at 8:06 AM zhangminglei <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |