Hi, I have written a simple test program as below import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.util.serialization.DeserializationSchema import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.flink.core.fs.FileSystem import org.apache.flink.streaming.api import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.connectors.kafka object md_streaming { private var zookeeperUrl = "rhes75:2181" private var requestConsumerId = null private var impressionConsumerId = null private var clickConsumerId = null private var conversionConsumerId = null private var requestTopicName = null private var impressionTopicName = null private var clickTopicName = null private var conversionTopicName = null private var requestThreads = 0 private var impressionThreads = 0 private var clickThreads = 0 private var conversionThreads = 0 private var flinkAppName = "md_streaming" private var bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094" private var schemaRegistryURL = "http://rhes75:8081" private var zookeeperConnect = "rhes75:2181" private var zookeeperConnectionTimeoutMs = "10000" private var rebalanceBackoffMS = "15000" private var zookeeperSessionTimeOutMs = "15000" private var autoCommitIntervalMS = "12000" private var topicsValue = "final" private var memorySet = "F" private var enableHiveSupport = null private var enableHiveSupportValue = "true" private var sparkStreamingReceiverMaxRateValue = "0" private var checkpointdir = "/checkpoint" private var hbaseHost = "rhes75" private var zookeeperHost = "rhes564" private var zooKeeperClientPort = "2181" private var writeDirectory = "hdfs://rhes75:9000/tmp/flink/" private var fileName = "md_streaming.txt" private val maxServingDelay = 60 private val servingSpeedFactor = 600f private var batchInterval = 2 private val countWindowLength = 4 // window size in sec private val countWindowFrequency = 2 // window trigger interval in sec private val earlyCountThreshold = 50 private val writeToElasticsearch = false // set to true to write results to Elasticsearch private val elasticsearchHost = "" // look-up hostname in Elasticsearch log output private val elasticsearchPort = 9300 def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val properties = new Properties() properties.setProperty("bootstrap.servers", bootstrapServers) properties.setProperty("zookeeper.connect", zookeeperConnect) properties.setProperty("group.id", flinkAppName) properties.setProperty("auto.offset.reset", "latest") val myConsumer = env.addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()) env.addSource(myConsumer).print() //val sink = kafkaSource.writeAsText(writeDirectory+fileName, FileSystem.WriteMode.OVERWRITE) //env.execute("Flink Kafka Example writing to "+writeDirectory+fileName) env.execute("Flink simple output") } } However, when compiling I am getting the following errors [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:98: not found: type CustomWatermarkEmitter [error] myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()) [error] ^ [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:99: type mismatch; [error] found : org.apache.flink.streaming.api.datastream.DataStreamSource[String] [error] required: org.apache.flink.streaming.api.functions.source.SourceFunction[?] [error] env.addSource(myConsumer).print() [error] ^ [error] two errors found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM Completed compiling Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode** Could not build the program from JAR file. I don't see why it is failing. Appreciate any suggestions. Regards, Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
|
It seems that it's related with your development environment settings. On Sun, Jul 29, 2018 at 4:39 PM Mich Talebzadeh <[hidden email]> wrote:
Liu, Renjie Software Engineer, MVAD |
Hi Mich, These two mistakes are obvious. 1): The compiler can not find the definition of CustomWatermarkEmitter. Did you define it? Or import the dependency if it defines in other place? 2): The type of variable "myCustomer" is "DataStreamSource", but env.addSource method receive a source function. Actually, you have added the source with : val myConsumer = env.addSource(new FlinkKafkaConsumer011[String]( so, just : myConsumer.print() It should work. Thanks, vino. 2018-07-29 17:15 GMT+08:00 Renjie Liu <[hidden email]>:
|
Thanks, I'll check them out. Regards, Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Mon, 30 Jul 2018 at 01:08, vino yang <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |