public class MyKafkaSerializationSchema implements KafkaSerializationSchema<Tuple2<String, String>> { FlinkKafkaProducer<Tuple2<String, String>> producer = new FlinkKafkaProducer<Tuple2<String, String>>( But there's error when runnng: java.lang.AbstractMethodError: com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; Any suggestion on this? Thanks, Lei |
Hi,
It would be helpful if you could provide full stack trace, what Flink version and which Kafka connector version are you using? It sounds like either a dependency convergence error (mixing Kafka dependencies/various versions of flink-connector-kafka inside a single job/jar) or some shading issue. Can you check your project for such issues (`mvn dependency:tree` command [1]). Also what’s a bit suspicious for me is the return type: > Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; I’m not sure, but I was not aware that we are shading Kafka dependency in our connectors? Are you manually shading something? Piotrek [1] https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html
|
I think what might be happening is that you're mixing dependencies from
the flink-sql-connector-kafka and the proper flink-connector-kafka that should be used with the DataStream API. Could that be the case? Best, Aljoscha On 25.05.20 19:18, Piotr Nowojski wrote: > Hi, > > It would be helpful if you could provide full stack trace, what Flink version and which Kafka connector version are you using? > > It sounds like either a dependency convergence error (mixing Kafka dependencies/various versions of flink-connector-kafka inside a single job/jar) or some shading issue. Can you check your project for such issues (`mvn dependency:tree` command [1]). > > Also what’s a bit suspicious for me is the return type: > >> Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; > > I’m not sure, but I was not aware that we are shading Kafka dependency in our connectors? Are you manually shading something? > > Piotrek > > [1] https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html <https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html> > >> On 22 May 2020, at 15:34, [hidden email] wrote: >> >> >> public class MyKafkaSerializationSchema implements KafkaSerializationSchema<Tuple2<String, String>> { >> @Override >> public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, String> o, @Nullable Long aLong) { >> ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(o.f0, >> o.f1.getBytes(StandardCharsets.UTF_8)); >> return record; >> } >> } >> FlinkKafkaProducer<Tuple2<String, String>> producer = new FlinkKafkaProducer<Tuple2<String, String>>( >> "default", new MyKafkaSerializationSchema(), >> prop2,Semantic.EXACTLY_ONCE); >> >> But there's error when runnng: >> >> java.lang.AbstractMethodError: com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; >> >> Any suggestion on this? >> >> Thanks, >> Lei >> [hidden email] <mailto:[hidden email]> > |
Hi,wanglei
I think Aljoscha is wright. Could you post your denpendency list? Dependency flink-connector-kafka is used in dataStream Application which you should use, dependency flink-sql-connector-kafka is used in Table API & SQL Application. We should only add one of them because the two dependency will conflict. Best, Leonard Xu
|
It is because the jar conflict and i have fixed it. I put flink-connector-kafka_2.11-1.10.0.jar in the flink lib directory. Also in my project pom file has the dependency flink-connector-kafka and builded as a fat jar Thanks,
Lei
|
Free forum by Nabble | Edit this page |