Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar

Bart Kastermans
I have a custom serializer for writing/reading from kafka.  I am setting
this up in main with code as follows:

    val kafkaConsumerProps = new Properties()
    kafkaConsumerProps.setProperty("bootstrap.servers", kafka_bootstrap)
    kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}")
    kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}")
    val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new
    KafkaRaeEventSerializer(schemaBaseDirectory),
      kafkaConsumerProps)

This generates java.lang.NoClassDefFoundError on classes that are
in my job jar.  Printing the classpath doesn't show the libraries
explicitly (but these are also not shown explicitly in place where they
are found; I guess the current jar is now shown on the classpath).  I
don't know how to list the current classloaders.

Also, the error goes away when I add the dependency to /flink/lib and
restart flink.  Hence my conjecture that in the kafka
serializer/deserializer context the depenencies from my job jar are
not available.

Flink version 1.2.0

Any help greatly appreciated; also I'll be happy to provide additional
info.

Also greatly appreciated where I should have looked in the flink code to
decide the answer myself.

- bart
Reply | Threaded
Open this post in threaded view
|

Re: Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar

Timo Walther
Hi Bart,

usually, this error means that your Maven project configuration is not
correct. Is your custom class included in the jar file that you submit
to the cluster?

It might make sense to share your pom.xml with us.

Regards,
Timo



Am 11/29/17 um 2:44 PM schrieb Bart Kastermans:

> I have a custom serializer for writing/reading from kafka.  I am setting
> this up in main with code as follows:
>
>      val kafkaConsumerProps = new Properties()
>      kafkaConsumerProps.setProperty("bootstrap.servers", kafka_bootstrap)
>      kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}")
>      kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}")
>      val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new
>      KafkaRaeEventSerializer(schemaBaseDirectory),
>        kafkaConsumerProps)
>
> This generates java.lang.NoClassDefFoundError on classes that are
> in my job jar.  Printing the classpath doesn't show the libraries
> explicitly (but these are also not shown explicitly in place where they
> are found; I guess the current jar is now shown on the classpath).  I
> don't know how to list the current classloaders.
>
> Also, the error goes away when I add the dependency to /flink/lib and
> restart flink.  Hence my conjecture that in the kafka
> serializer/deserializer context the depenencies from my job jar are
> not available.
>
> Flink version 1.2.0
>
> Any help greatly appreciated; also I'll be happy to provide additional
> info.
>
> Also greatly appreciated where I should have looked in the flink code to
> decide the answer myself.
>
> - bart


Reply | Threaded
Open this post in threaded view
|

Re: Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar

Chesnay Schepler
This issues sounds strikingly similar to FLINK-6965.

TL;DR: You must place classes loaded during serialization by the kafka
connector under /lib.

On 29.11.2017 16:15, Timo Walther wrote:

> Hi Bart,
>
> usually, this error means that your Maven project configuration is not
> correct. Is your custom class included in the jar file that you submit
> to the cluster?
>
> It might make sense to share your pom.xml with us.
>
> Regards,
> Timo
>
>
>
> Am 11/29/17 um 2:44 PM schrieb Bart Kastermans:
>> I have a custom serializer for writing/reading from kafka.  I am setting
>> this up in main with code as follows:
>>
>>      val kafkaConsumerProps = new Properties()
>>      kafkaConsumerProps.setProperty("bootstrap.servers",
>> kafka_bootstrap)
>> kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}")
>> kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}")
>>      val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new
>>      KafkaRaeEventSerializer(schemaBaseDirectory),
>>        kafkaConsumerProps)
>>
>> This generates java.lang.NoClassDefFoundError on classes that are
>> in my job jar.  Printing the classpath doesn't show the libraries
>> explicitly (but these are also not shown explicitly in place where they
>> are found; I guess the current jar is now shown on the classpath).  I
>> don't know how to list the current classloaders.
>>
>> Also, the error goes away when I add the dependency to /flink/lib and
>> restart flink.  Hence my conjecture that in the kafka
>> serializer/deserializer context the depenencies from my job jar are
>> not available.
>>
>> Flink version 1.2.0
>>
>> Any help greatly appreciated; also I'll be happy to provide additional
>> info.
>>
>> Also greatly appreciated where I should have looked in the flink code to
>> decide the answer myself.
>>
>> - bart
>
>
>