not found: type CustomWatermarkEmitter

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

not found: type CustomWatermarkEmitter

Mich Talebzadeh
Hi,

I have written a simple test program as below

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.connectors.kafka
object md_streaming
{
  private var zookeeperUrl = "rhes75:2181"
  private var requestConsumerId = null
  private var impressionConsumerId = null
  private var clickConsumerId = null
  private var conversionConsumerId = null
  private var requestTopicName = null
  private var impressionTopicName = null
  private var clickTopicName = null
  private var conversionTopicName = null
  private var requestThreads = 0
  private var impressionThreads = 0
  private var clickThreads = 0
  private var conversionThreads = 0
  private var flinkAppName = "md_streaming"
  private var bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094"
  private var schemaRegistryURL = "http://rhes75:8081"
  private var zookeeperConnect = "rhes75:2181"
  private var zookeeperConnectionTimeoutMs = "10000"
  private var rebalanceBackoffMS = "15000"
  private var zookeeperSessionTimeOutMs = "15000"
  private var autoCommitIntervalMS = "12000"
  private var topicsValue = "final"
  private var memorySet = "F"
  private var enableHiveSupport = null
  private var enableHiveSupportValue = "true"
  private var sparkStreamingReceiverMaxRateValue = "0"
  private var checkpointdir = "/checkpoint"
  private var hbaseHost = "rhes75"
  private var zookeeperHost = "rhes564"
  private var zooKeeperClientPort = "2181"
  private var writeDirectory = "hdfs://rhes75:9000/tmp/flink/"
  private var fileName = "md_streaming.txt"
  private val maxServingDelay = 60
  private val servingSpeedFactor = 600f
  private var batchInterval = 2
  private val countWindowLength = 4 // window size in sec
  private val countWindowFrequency =  2   // window trigger interval in sec
  private val earlyCountThreshold = 50
  private val writeToElasticsearch = false // set to true to write results to Elasticsearch
  private val elasticsearchHost = "" // look-up hostname in Elasticsearch log output
  private val elasticsearchPort = 9300

  def main(args: Array[String])
  {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", bootstrapServers)
    properties.setProperty("zookeeper.connect", zookeeperConnect)
    properties.setProperty("group.id", flinkAppName)
    properties.setProperty("auto.offset.reset", "latest")
    
    val myConsumer = env.addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))
 
    myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
    env.addSource(myConsumer).print()
    //val sink = kafkaSource.writeAsText(writeDirectory+fileName, FileSystem.WriteMode.OVERWRITE)
    //env.execute("Flink Kafka Example writing to "+writeDirectory+fileName)
    env.execute("Flink simple output")
  }
}


However, when compiling I am getting the following errors

[error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:98: not found: type CustomWatermarkEmitter
[error]     myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
[error]                                                  ^
[error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:99: type mismatch;
[error]  found   : org.apache.flink.streaming.api.datastream.DataStreamSource[String]
[error]  required: org.apache.flink.streaming.api.functions.source.SourceFunction[?]
[error]     env.addSource(myConsumer).print()
[error]                   ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM
Completed compiling
Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode**
Could not build the program from JAR file.


I don't see why it is failing. Appreciate any suggestions.

Regards,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: not found: type CustomWatermarkEmitter

Renjie Liu
It seems that it's related with your development environment settings.

On Sun, Jul 29, 2018 at 4:39 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

I have written a simple test program as below

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.connectors.kafka
object md_streaming
{
  private var zookeeperUrl = "rhes75:2181"
  private var requestConsumerId = null
  private var impressionConsumerId = null
  private var clickConsumerId = null
  private var conversionConsumerId = null
  private var requestTopicName = null
  private var impressionTopicName = null
  private var clickTopicName = null
  private var conversionTopicName = null
  private var requestThreads = 0
  private var impressionThreads = 0
  private var clickThreads = 0
  private var conversionThreads = 0
  private var flinkAppName = "md_streaming"
  private var bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094"
  private var schemaRegistryURL = "http://rhes75:8081"
  private var zookeeperConnect = "rhes75:2181"
  private var zookeeperConnectionTimeoutMs = "10000"
  private var rebalanceBackoffMS = "15000"
  private var zookeeperSessionTimeOutMs = "15000"
  private var autoCommitIntervalMS = "12000"
  private var topicsValue = "final"
  private var memorySet = "F"
  private var enableHiveSupport = null
  private var enableHiveSupportValue = "true"
  private var sparkStreamingReceiverMaxRateValue = "0"
  private var checkpointdir = "/checkpoint"
  private var hbaseHost = "rhes75"
  private var zookeeperHost = "rhes564"
  private var zooKeeperClientPort = "2181"
  private var writeDirectory = "hdfs://rhes75:9000/tmp/flink/"
  private var fileName = "md_streaming.txt"
  private val maxServingDelay = 60
  private val servingSpeedFactor = 600f
  private var batchInterval = 2
  private val countWindowLength = 4 // window size in sec
  private val countWindowFrequency =  2   // window trigger interval in sec
  private val earlyCountThreshold = 50
  private val writeToElasticsearch = false // set to true to write results to Elasticsearch
  private val elasticsearchHost = "" // look-up hostname in Elasticsearch log output
  private val elasticsearchPort = 9300

  def main(args: Array[String])
  {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", bootstrapServers)
    properties.setProperty("zookeeper.connect", zookeeperConnect)
    properties.setProperty("group.id", flinkAppName)
    properties.setProperty("auto.offset.reset", "latest")
    
    val myConsumer = env.addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))
 
    myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
    env.addSource(myConsumer).print()
    //val sink = kafkaSource.writeAsText(writeDirectory+fileName, FileSystem.WriteMode.OVERWRITE)
    //env.execute("Flink Kafka Example writing to "+writeDirectory+fileName)
    env.execute("Flink simple output")
  }
}


However, when compiling I am getting the following errors

[error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:98: not found: type CustomWatermarkEmitter
[error]     myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
[error]                                                  ^
[error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:99: type mismatch;
[error]  found   : org.apache.flink.streaming.api.datastream.DataStreamSource[String]
[error]  required: org.apache.flink.streaming.api.functions.source.SourceFunction[?]
[error]     env.addSource(myConsumer).print()
[error]                   ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM
Completed compiling
Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode**
Could not build the program from JAR file.


I don't see why it is failing. Appreciate any suggestions.

Regards,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

--
Liu, Renjie
Software Engineer, MVAD
Reply | Threaded
Open this post in threaded view
|

Re: not found: type CustomWatermarkEmitter

vino yang
Hi Mich,

These two mistakes are obvious.

1): The compiler can not find the definition of CustomWatermarkEmitter. Did you define it? Or import the dependency if it defines in other place?
2): The type of variable "myCustomer"  is "DataStreamSource", but env.addSource method receive a source function. Actually, you have added the source with :

val myConsumer = env.addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

so, just :

myConsumer.print()

It should work.

Thanks, vino.


2018-07-29 17:15 GMT+08:00 Renjie Liu <[hidden email]>:
It seems that it's related with your development environment settings.

On Sun, Jul 29, 2018 at 4:39 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

I have written a simple test program as below

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.connectors.kafka
object md_streaming
{
  private var zookeeperUrl = "rhes75:2181"
  private var requestConsumerId = null
  private var impressionConsumerId = null
  private var clickConsumerId = null
  private var conversionConsumerId = null
  private var requestTopicName = null
  private var impressionTopicName = null
  private var clickTopicName = null
  private var conversionTopicName = null
  private var requestThreads = 0
  private var impressionThreads = 0
  private var clickThreads = 0
  private var conversionThreads = 0
  private var flinkAppName = "md_streaming"
  private var bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094"
  private var schemaRegistryURL = "http://rhes75:8081"
  private var zookeeperConnect = "rhes75:2181"
  private var zookeeperConnectionTimeoutMs = "10000"
  private var rebalanceBackoffMS = "15000"
  private var zookeeperSessionTimeOutMs = "15000"
  private var autoCommitIntervalMS = "12000"
  private var topicsValue = "final"
  private var memorySet = "F"
  private var enableHiveSupport = null
  private var enableHiveSupportValue = "true"
  private var sparkStreamingReceiverMaxRateValue = "0"
  private var checkpointdir = "/checkpoint"
  private var hbaseHost = "rhes75"
  private var zookeeperHost = "rhes564"
  private var zooKeeperClientPort = "2181"
  private var writeDirectory = "hdfs://rhes75:9000/tmp/flink/"
  private var fileName = "md_streaming.txt"
  private val maxServingDelay = 60
  private val servingSpeedFactor = 600f
  private var batchInterval = 2
  private val countWindowLength = 4 // window size in sec
  private val countWindowFrequency =  2   // window trigger interval in sec
  private val earlyCountThreshold = 50
  private val writeToElasticsearch = false // set to true to write results to Elasticsearch
  private val elasticsearchHost = "" // look-up hostname in Elasticsearch log output
  private val elasticsearchPort = 9300

  def main(args: Array[String])
  {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", bootstrapServers)
    properties.setProperty("zookeeper.connect", zookeeperConnect)
    properties.setProperty("group.id", flinkAppName)
    properties.setProperty("auto.offset.reset", "latest")
    
    val myConsumer = env.addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))
 
    myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
    env.addSource(myConsumer).print()
    //val sink = kafkaSource.writeAsText(writeDirectory+fileName, FileSystem.WriteMode.OVERWRITE)
    //env.execute("Flink Kafka Example writing to "+writeDirectory+fileName)
    env.execute("Flink simple output")
  }
}


However, when compiling I am getting the following errors

[error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:98: not found: type CustomWatermarkEmitter
[error]     myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
[error]                                                  ^
[error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:99: type mismatch;
[error]  found   : org.apache.flink.streaming.api.datastream.DataStreamSource[String]
[error]  required: org.apache.flink.streaming.api.functions.source.SourceFunction[?]
[error]     env.addSource(myConsumer).print()
[error]                   ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM
Completed compiling
Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode**
Could not build the program from JAR file.


I don't see why it is failing. Appreciate any suggestions.

Regards,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

--
Liu, Renjie
Software Engineer, MVAD

Reply | Threaded
Open this post in threaded view
|

Re: not found: type CustomWatermarkEmitter

Mich Talebzadeh
Thanks, I'll check them out. 

Regards,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 01:08, vino yang <[hidden email]> wrote:
Hi Mich,

These two mistakes are obvious.

1): The compiler can not find the definition of CustomWatermarkEmitter. Did you define it? Or import the dependency if it defines in other place?
2): The type of variable "myCustomer"  is "DataStreamSource", but env.addSource method receive a source function. Actually, you have added the source with :

val myConsumer = env.addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

so, just :

myConsumer.print()

It should work.

Thanks, vino.


2018-07-29 17:15 GMT+08:00 Renjie Liu <[hidden email]>:
It seems that it's related with your development environment settings.

On Sun, Jul 29, 2018 at 4:39 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

I have written a simple test program as below

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.connectors.kafka
object md_streaming
{
  private var zookeeperUrl = "rhes75:2181"
  private var requestConsumerId = null
  private var impressionConsumerId = null
  private var clickConsumerId = null
  private var conversionConsumerId = null
  private var requestTopicName = null
  private var impressionTopicName = null
  private var clickTopicName = null
  private var conversionTopicName = null
  private var requestThreads = 0
  private var impressionThreads = 0
  private var clickThreads = 0
  private var conversionThreads = 0
  private var flinkAppName = "md_streaming"
  private var bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094"
  private var schemaRegistryURL = "http://rhes75:8081"
  private var zookeeperConnect = "rhes75:2181"
  private var zookeeperConnectionTimeoutMs = "10000"
  private var rebalanceBackoffMS = "15000"
  private var zookeeperSessionTimeOutMs = "15000"
  private var autoCommitIntervalMS = "12000"
  private var topicsValue = "final"
  private var memorySet = "F"
  private var enableHiveSupport = null
  private var enableHiveSupportValue = "true"
  private var sparkStreamingReceiverMaxRateValue = "0"
  private var checkpointdir = "/checkpoint"
  private var hbaseHost = "rhes75"
  private var zookeeperHost = "rhes564"
  private var zooKeeperClientPort = "2181"
  private var writeDirectory = "hdfs://rhes75:9000/tmp/flink/"
  private var fileName = "md_streaming.txt"
  private val maxServingDelay = 60
  private val servingSpeedFactor = 600f
  private var batchInterval = 2
  private val countWindowLength = 4 // window size in sec
  private val countWindowFrequency =  2   // window trigger interval in sec
  private val earlyCountThreshold = 50
  private val writeToElasticsearch = false // set to true to write results to Elasticsearch
  private val elasticsearchHost = "" // look-up hostname in Elasticsearch log output
  private val elasticsearchPort = 9300

  def main(args: Array[String])
  {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", bootstrapServers)
    properties.setProperty("zookeeper.connect", zookeeperConnect)
    properties.setProperty("group.id", flinkAppName)
    properties.setProperty("auto.offset.reset", "latest")
    
    val myConsumer = env.addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))
 
    myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
    env.addSource(myConsumer).print()
    //val sink = kafkaSource.writeAsText(writeDirectory+fileName, FileSystem.WriteMode.OVERWRITE)
    //env.execute("Flink Kafka Example writing to "+writeDirectory+fileName)
    env.execute("Flink simple output")
  }
}


However, when compiling I am getting the following errors

[error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:98: not found: type CustomWatermarkEmitter
[error]     myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
[error]                                                  ^
[error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:99: type mismatch;
[error]  found   : org.apache.flink.streaming.api.datastream.DataStreamSource[String]
[error]  required: org.apache.flink.streaming.api.functions.source.SourceFunction[?]
[error]     env.addSource(myConsumer).print()
[error]                   ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM
Completed compiling
Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode**
Could not build the program from JAR file.


I don't see why it is failing. Appreciate any suggestions.

Regards,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

--
Liu, Renjie
Software Engineer, MVAD