Hi, I have written a simple test program as below import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.util.serialization.DeserializationSchema import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.flink.core.fs.FileSystem import org.apache.flink.streaming.api import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.connectors.kafka object md_streaming { private var zookeeperUrl = "rhes75:2181" private var requestConsumerId = null private var impressionConsumerId = null private var clickConsumerId = null private var conversionConsumerId = null private var requestTopicName = null private var impressionTopicName = null private var clickTopicName = null private var conversionTopicName = null private var requestThreads = 0 private var impressionThreads = 0 private var clickThreads = 0 private var conversionThreads = 0 private var flinkAppName = "md_streaming" private var bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094" private var schemaRegistryURL = "http://rhes75:8081" private var zookeeperConnect = "rhes75:2181" private var zookeeperConnectionTimeoutMs = "10000" private var rebalanceBackoffMS = "15000" private var zookeeperSessionTimeOutMs = "15000" private var autoCommitIntervalMS = "12000" private var topicsValue = "final" private var memorySet = "F" private var enableHiveSupport = null private var enableHiveSupportValue = "true" private var sparkStreamingReceiverMaxRateValue = "0" private var checkpointdir = "/checkpoint" private var hbaseHost = "rhes75" private var zookeeperHost = "rhes564" private var zooKeeperClientPort = "2181" private var writeDirectory = "hdfs://rhes75:9000/tmp/flink/" private var fileName = "md_streaming.txt" private val maxServingDelay = 60 private val servingSpeedFactor = 600f private var batchInterval = 2 private val countWindowLength = 4 // window size in sec private val countWindowFrequency = 2 // window trigger interval in sec private val earlyCountThreshold = 50 private val writeToElasticsearch = false // set to true to write results to Elasticsearch private val elasticsearchHost = "" // look-up hostname in Elasticsearch log output private val elasticsearchPort = 9300 def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val properties = new Properties() properties.setProperty("bootstrap.servers", bootstrapServers) properties.setProperty("zookeeper.connect", zookeeperConnect) properties.setProperty("group.id", flinkAppName) properties.setProperty("auto.offset.reset", "latest") val myConsumer = env.addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()) env.addSource(myConsumer).print() //val sink = kafkaSource.writeAsText(writeDirectory+fileName, FileSystem.WriteMode.OVERWRITE) //env.execute("Flink Kafka Example writing to "+writeDirectory+fileName) env.execute("Flink simple output") } } However, when compiling I am getting the following errors [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:98: not found: type CustomWatermarkEmitter [error] myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()) [error] ^ [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:99: type mismatch; [error] found : org.apache.flink.streaming.api.datastream.DataStreamSource[String] [error] required: org.apache.flink.streaming.api.functions.source.SourceFunction[?] [error] env.addSource(myConsumer).print() [error] ^ [error] two errors found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM Completed compiling Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode** Could not build the program from JAR file. I don't see why it is failing. Appreciate any suggestions. Regards, Dr Mich Talebzadeh
It seems that it's related with your development environment settings. On Sun, Jul 29, 2018 at 4:39 PM Mich Talebzadeh <[hidden email]> wrote:
Hi Mich, These two mistakes are obvious. 1): The compiler can not find the definition of CustomWatermarkEmitter. Did you define it? Or import the dependency if it defines in other place? 2): The type of variable "myCustomer" is "DataStreamSource", but env.addSource method receive a source function. Actually, you have added the source with : val myConsumer = env.addSource(new FlinkKafkaConsumer011[String]( so, just : myConsumer.print() It should work. Thanks, vino. 2018-07-29 17:15 GMT+08:00 Renjie Liu <[hidden email]>:
Thanks, I'll check them out. Regards, Dr Mich Talebzadeh
On Mon, 30 Jul 2018 at 01:08, vino yang <[hidden email]> wrote:
