Using FlinkKafkaConsumer API

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Using FlinkKafkaConsumer API

Vishwas Siravara
I am using flink-kafka-connector and this is my dependency 

"org.apache.flink" %% "flink-connector-kafka" % "1.7.0",


Whe I look at my dependency tree the kafka client version is

 -org.apache.kafka:kafka-clients:2.0.1 which comes from the above package. 


However when I run my code in the cluster I see that the kafka-client that is loaded is 

 0.10.2.0


Here is the task executor log : 

2019-09-09 03:05:56,825 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.10.2.0
I am struggling to find out where this dependency is coming from. Our broker version is not 
compatible with this client. How can I force flink to use 2.0.1. 

Also the API I use for Kafka Consumer is 
 private[flink] def sourceType: FlinkKafkaConsumer[GenericRecord] = {
val consumer = new FlinkKafkaConsumer[GenericRecord](
source.asJava,
AvroDeserialization.genericRecd,
ExecutionEnv.streamProperties)
consumer
}

}

I really appreciate help. Is there any way I can find out where this dependency comes from in the cluster as this is clearly not coming form my application. 


Thanks,
Vishwas