Classloader issue with UDF's in DataStreamSource

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Classloader issue with UDF's in DataStreamSource

Edward
I need help debugging a problem with using user defined functions in my DataStreamSource code.

Here's the behavior:
The first time I upload my jar to the Flink cluster and submit the job, it runs fine.
For any subsequent runs of the same job, it's giving me a NoClassDefFound error on one of my UDF classes.
If I restart the Flink cluster, this it will again work, but only the first time I submit the job.

I am using a customized KafkaAvroDeserializer where the reader schema is different from the writer schema (and where that reader schema in a generated Avro class in which is included in my uploaded jar file). If I change my code to use the standard KafkaAvroDeserializer (i.e. no UDF's in the DataStreamSource), it works fine, even though there are UDF's used in other steps of my job, so the problem seems specific to DataStreamSource step.

Why would the classloader here not have access to all classes in my uploaded jar file, while the classloader used in subsequent steps does have access to that jar file? Why would it work fine the first time I upload the jar via the Flink Dashboard, but not on subsequent executions?

Here's the exception I'm seeing:

2017-08-28 13:37:24.258 [flink-akka.actor.default-dispatcher-16] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> Sink: Unnamed (3/5) (287369765ce9d6bab4fce7c284bf4d4c) switched from RUNNING to FAILED.
java.lang.NoClassDefFoundError: com/smartertravel/pbr/datapipeline/entities/LookbackMessageKey$Builder
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
        at java.lang.Class.getDeclaredMethods(Class.java:1975)
        at org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods(TypeExtractionUtils.java:243)
        at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1949)
        at org.apache.flink.api.java.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:55)
        at org.apache.flink.api.java.typeutils.AvroTypeInfo.<init>(AvroTypeInfo.java:48)
        at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1810)
        at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1716)
        at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:953)
        at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:814)
        at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:768)
        at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:764)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createSubclassSerializer(PojoSerializer.java:1129)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.getSubclassSerializer(PojoSerializer.java:1122)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:253)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.smartertravel.pbr.datapipeline.entities.LookbackMessageKey$Builder
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 35 common frames omitted


Reply | Threaded
Open this post in threaded view
|

Re: Classloader issue with UDF's in DataStreamSource

Edward
In case anyone else runs into this, here's what I discovered:

For whatever reason, the classloader used by
org.apache.flink.api.java.typeutils.TypeExtractor did not have access to the
classes in my udf.jar file. However, if I changed my
KeyedDeserializationSchema implementation to use standard Avro classes (i.e.
GenericRecord rather than a SpecificRecord), the classloader didn't require
any of the generated Avro classes in udf.jar during the ExecutionGraph
stage.

At execution time, my deserializer forced the returned GenericRecord into
the my custom Avro SpecificRecord class, which was available to the
classloader at this point.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/