Displaying topic data with Flink streaming

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Displaying topic data with Flink streaming

Mich Talebzadeh

Hi,

I have a streaming topic called "md" that displays test market data.

I have written a simple program to stream data in via kafka into flinl.

Flink version 1.5
Kafka version 2.12

This is the sample program in scala that compiles ok in start-scala-shell.sh

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
//import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "sampleScala")
    val stream = env
                 .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties))
                 .print()
    env.execute("Flink Kafka Example")
  }
}


warning: there was one deprecation warning; re-run with -deprecation for details
defined object Main

But I do not see any streaming output. 

A naïve question. How do I execute the above compiled object in this shell?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Displaying topic data with Flink streaming

zhangminglei
Please try new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties).setStartFromEarliest() and try again.

Cheers
Minglei.


在 2018年6月30日,下午10:08,Mich Talebzadeh <[hidden email]> 写道:


Hi,

I have a streaming topic called "md" that displays test market data.

I have written a simple program to stream data in via kafka into flinl.

Flink version 1.5
Kafka version 2.12

This is the sample program in scala that compiles ok in start-scala-shell.sh

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
//import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "sampleScala")
    val stream = env
                 .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties))
                 .print()
    env.execute("Flink Kafka Example")
  }
}


warning: there was one deprecation warning; re-run with -deprecation for details
defined object Main

But I do not see any streaming output. 

A naïve question. How do I execute the above compiled object in this shell?

Thanks

Dr Mich Talebzadeh

 

 


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 


Reply | Threaded
Open this post in threaded view
|

Re: Displaying topic data with Flink streaming

Rong Rong
Hi Mich,

How did you setup your local Kafka cluster, did you produce any message to it? Seems like you are using a standard local Kafka cluster setup for testing:
"bootstrap.servers", "localhost:9092" "zookeeper.connect", "localhost:2181"

so probably you need to manually produce some data, probably using kafka-console-producer [1] 

Another thing is since you are executing it in the scala shell, it might be easier for you to do
    val stream = env
                 .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties))
                 .writeAsText("some_file_path")
so that the produced result won't get buried in a huge list of console output messages.

--
Rong


On Sat, Jun 30, 2018 at 8:06 AM zhangminglei <[hidden email]> wrote:
Please try new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties).setStartFromEarliest() and try again.

Cheers
Minglei.


在 2018年6月30日,下午10:08,Mich Talebzadeh <[hidden email]> 写道:


Hi,

I have a streaming topic called "md" that displays test market data.

I have written a simple program to stream data in via kafka into flinl.

Flink version 1.5
Kafka version 2.12

This is the sample program in scala that compiles ok in start-scala-shell.sh

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
//import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "sampleScala")
    val stream = env
                 .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties))
                 .print()
    env.execute("Flink Kafka Example")
  }
}


warning: there was one deprecation warning; re-run with -deprecation for details
defined object Main

But I do not see any streaming output. 

A naïve question. How do I execute the above compiled object in this shell?

Thanks

Dr Mich Talebzadeh

 

 


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.