Hey,
I'm trying to run a job which uses a dynamically generated class (through Byte Buddy). think of me having a complex schema as yaml text and generating a class from it. Throughout the job i am using an artificial super class (MySuperClass) of the generated class (as for example i need to specify the generic class to extend RichMapFunction). MyRichMapFunction<Y extends MySuperClass> extends RichMapFunction<Row, Y> is introducing the dynamic class. It will take the yaml in the CTOR and: 1. open - takes the schema and converts it into a Pojo class which extends MySuperClass 2. getProducedType - does the same thing in order to correctly send the Pojo with all the right fields So basically my job is something like env.addSource([stream of pojos]) .filter(...) ... (register table, running a query which generates Rows) .map(myRichMapFunction) .returns(myRichMapFunction.getProducedType) .addSink(...) My trouble now is that, when running on a cluster the classloader fails to load my generated class. i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader to use for Byte Buddy - but doesnt seem to be enough. Was reading about it here: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html Is there a hook maybe to get called when a job is loaded so i can load the class? Stacktrace: org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com....model.MyGeneratedClass ClassLoader info: URL ClassLoader: file: '/var/folders/f7/c4pvjrf902b6c73_tbzkxnjw0000gn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0' (valid JAR) Class not resolvable through given classloader. at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:264) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at com.....MainClass.main(MainClass.java:46) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101) Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.....model.DynamicSchema ClassLoader info: URL ClassLoader: file: '/var/folders/f7/c4pvjrf902b6c73_tbzkxnjw0000gn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0' (valid JAR) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:273) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi shkob > i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader to use for Byte Buddy - but doesnt seem to be enough. From the log, it seems that the user class can not be found in the classloader. Cannot load user class: com....model.MyGeneratedClass Have you ever tried Thread.currentThread().getContextClassLoader(), which should have the user-code ClassLoader. Best, Hequn On Tue, Oct 23, 2018 at 5:47 AM shkob1 <[hidden email]> wrote: Hey, |
I have with no luck. I wonder though - do I need to load it only in the map function? I tried to add it in the open method of the sink function and the process function I have there too cause they still are using the type.. still no good. is there any way.
is there a way of knowing which operator fails?
From: Hequn Cheng <[hidden email]>
Sent: Monday, October 22, 2018 6:33:52 PM To: Shahar Cizer Kobrinsky Cc: user Subject: Re: Dynamically Generated Classes - Cannot load user class Hi shkob
> i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader to use for Byte Buddy - but doesnt seem to be enough.
From the log, it seems that the user class can not be found in the classloader.
Cannot load user class: com....model.MyGeneratedClass Have you ever tried Thread.currentThread().getContextClassLoader(), which should have the user-code ClassLoader.
Best, Hequn
On Tue, Oct 23, 2018 at 5:47 AM shkob1 <[hidden email]> wrote:
Hey, |
After removing some operators (which i still need, but wanted to understand
where my issues are) i get a slightly different stacktrace (though still same issue). my current operators are 1. a sql select with group by (returns retracted stream <Boolean,Row> ) 2. filter (take only non retracted) 3. map (tuple to Row) 3. map (Row to MyGeneratedClass -> this implements the classloader load of the generated class on open()) org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order. at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:398) at org.apache.flink.streaming.runtime.tasks.StreamTask.createStreamRecordWriters(StreamTask.java:1165) at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:214) at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:193) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:51) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1445) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:680) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.....model.MyGeneratedClass at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1716) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1556) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:561) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.readObject(PojoSerializer.java:1038) at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at java.util.ArrayList.readObject(ArrayList.java:797) at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at java.util.ArrayList.readObject(ArrayList.java:797) at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at java.util.ArrayList.readObject(ArrayList.java:797) at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at java.util.ArrayList.readObject(ArrayList.java:797) at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at java.util.ArrayList.readObject(ArrayList.java:797) at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at java.util.ArrayList.readObject(ArrayList.java:797) at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:438) at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:395) ... 11 more according to the logs the Task itself contains all of those operators 2018-10-23 11:15:19,234 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: groupBy: (...), select: (...) -> to: Tuple2 -> Filter -> Map -> my-query (1/1) (06b471ad99d77f0449c30641bed7cc88) [DEPLOYING]. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Update on this - if i just do empty mapping and drop the sql part, it works
just fine. i wonder if there's any class loading that needs to be done when using SQL, not sure how i do that -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thinking out loud here:
I can't tell where the class load is failing. The general model I've used with ByteBuddy in this scenario is very similar to yours. I subclass my superclass using ByteBuddy. I inject the new class into a JAR that will be shared by the task managers. I subclass the Flink classes such as RichMapFunction so that the template parameters are accepted. I would have to be careful in my subclassed functions to not instantiate members that could not serialize. This, too, gets injected into the JAR. I would then instantiate all the classes and send them to the task managers. The trick to getting the classes to load on the task manager was to make sure all the JARs that they needed to reference were available. This would include my temporary JAR containing ByteBuddy classes as well as any JARs that weren't automatically distributed by Flink to the task managers, but which were otherwise needed. It's not clear from your note whether in dropping SQL your ByteBuddy functions are getting called. The last stack trace you produced indicates a problem other than finding or loading bytebuddy objects. Can you get the job to run without using Streams (ie, use Batch)? Can you get the job to run without using ByteBuddy (ie, just execute the SQL. Then use a hard-coded POJO. Then try adding bytebuddy. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
OK I think i figured it out - not sure though exactly the reason:
It seems that i need to have a stream type - Generic Type of the super class - rather than a Pojo of the concrete generated class. It seems like the operation definition otherwise cannot load the Pojo class on the task creation. So - if i don't declare the map produced type as the concrete generated class and then work around the keyby which cannot use a field name to a key selector. Doing all of that seems to work. Will be happy to hear about the reason for it more in depth if anyone knows. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |