I have a custom serializer for writing/reading from kafka. I am setting
this up in main with code as follows: val kafkaConsumerProps = new Properties() kafkaConsumerProps.setProperty("bootstrap.servers", kafka_bootstrap) kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}") kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}") val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new KafkaRaeEventSerializer(schemaBaseDirectory), kafkaConsumerProps) This generates java.lang.NoClassDefFoundError on classes that are in my job jar. Printing the classpath doesn't show the libraries explicitly (but these are also not shown explicitly in place where they are found; I guess the current jar is now shown on the classpath). I don't know how to list the current classloaders. Also, the error goes away when I add the dependency to /flink/lib and restart flink. Hence my conjecture that in the kafka serializer/deserializer context the depenencies from my job jar are not available. Flink version 1.2.0 Any help greatly appreciated; also I'll be happy to provide additional info. Also greatly appreciated where I should have looked in the flink code to decide the answer myself. - bart |
Hi Bart,
usually, this error means that your Maven project configuration is not correct. Is your custom class included in the jar file that you submit to the cluster? It might make sense to share your pom.xml with us. Regards, Timo Am 11/29/17 um 2:44 PM schrieb Bart Kastermans: > I have a custom serializer for writing/reading from kafka. I am setting > this up in main with code as follows: > > val kafkaConsumerProps = new Properties() > kafkaConsumerProps.setProperty("bootstrap.servers", kafka_bootstrap) > kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}") > kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}") > val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new > KafkaRaeEventSerializer(schemaBaseDirectory), > kafkaConsumerProps) > > This generates java.lang.NoClassDefFoundError on classes that are > in my job jar. Printing the classpath doesn't show the libraries > explicitly (but these are also not shown explicitly in place where they > are found; I guess the current jar is now shown on the classpath). I > don't know how to list the current classloaders. > > Also, the error goes away when I add the dependency to /flink/lib and > restart flink. Hence my conjecture that in the kafka > serializer/deserializer context the depenencies from my job jar are > not available. > > Flink version 1.2.0 > > Any help greatly appreciated; also I'll be happy to provide additional > info. > > Also greatly appreciated where I should have looked in the flink code to > decide the answer myself. > > - bart |
This issues sounds strikingly similar to FLINK-6965.
TL;DR: You must place classes loaded during serialization by the kafka connector under /lib. On 29.11.2017 16:15, Timo Walther wrote: > Hi Bart, > > usually, this error means that your Maven project configuration is not > correct. Is your custom class included in the jar file that you submit > to the cluster? > > It might make sense to share your pom.xml with us. > > Regards, > Timo > > > > Am 11/29/17 um 2:44 PM schrieb Bart Kastermans: >> I have a custom serializer for writing/reading from kafka. I am setting >> this up in main with code as follows: >> >> val kafkaConsumerProps = new Properties() >> kafkaConsumerProps.setProperty("bootstrap.servers", >> kafka_bootstrap) >> kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}") >> kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}") >> val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new >> KafkaRaeEventSerializer(schemaBaseDirectory), >> kafkaConsumerProps) >> >> This generates java.lang.NoClassDefFoundError on classes that are >> in my job jar. Printing the classpath doesn't show the libraries >> explicitly (but these are also not shown explicitly in place where they >> are found; I guess the current jar is now shown on the classpath). I >> don't know how to list the current classloaders. >> >> Also, the error goes away when I add the dependency to /flink/lib and >> restart flink. Hence my conjecture that in the kafka >> serializer/deserializer context the depenencies from my job jar are >> not available. >> >> Flink version 1.2.0 >> >> Any help greatly appreciated; also I'll be happy to provide additional >> info. >> >> Also greatly appreciated where I should have looked in the flink code to >> decide the answer myself. >> >> - bart > > > |
Free forum by Nabble | Edit this page |