Hi, I'm trying to load a FlinkKafkaProducer sink alongside another custom sink. While trying to restore a running Flink app from the previous state, I get the error message below. I am running Flink 1.9.0 with the following SBT dependency added: "org.apache.flink" %% "flink-connector-kafka" % 1.9.0 And the app is deployed via a standard uber jar with all the dependencies. W Would appreciate the help java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/util/ChildFirstClassLoader) previously initiated loading for a different type with name "org/apache/kafka/clients/producer/ProducerRecord" at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.security.AccessController.doPrivileged(Native Method) at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) at java.lang.Class.getDeclaredMethod(Class.java:2128) at java.security.AccessController.doPrivileged(Native Method) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.readObject(FlinkKafkaProducer.java:1202) at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:370) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Best Regards, Yuval Itzchakov. |
Hi Yuval, How do you execute Flink? Can you show us the log entry with the classpath? I'm guessing that you have Kafka bundled in your uber-jar and additionally also have the connector in flink-dist/lib. If so, you simply need to remove it in one place. In general, if you use flink-dist, you'd not bundle any Flink dependencies in your uber-jar (use provided scope for them). If you have everything bundled in one uber-jar and execute it somehow without flink-dist, then I don't immediately see a solution. Then the log with the classpath would help. Best, Arvid On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Small correction: you'd bundle the connectors in your uber jar like you did but you usually don't put it into flink-dist. So please double-check if it's also in flink-dist and remove it there. If not, then please add the full classpath log statement. It might also be a bug related to restoring and the way Flink loads the classes then, but I'd first eliminate the obvious. On Tue, Aug 25, 2020 at 5:12 PM Arvid Heise <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
In reply to this post by Arvid Heise-3
Hi Arvid, I'm running Flink in a job cluster on k8s using the Lyft Operator. The flink image that I'm building does not have the flink-connector-kafka library in it's JAR, I've made sure of this using `jar -tf`. Additionally, once I removed the dependency from my uber jar, it failed with a "NoSuchMethodException" at runtime for one of the arbitrary methods. I used classloader.resolve-order: parent-first and it resolved the issue somehow. I still don't know why though. On Tue, Aug 25, 2020 at 6:13 PM Arvid Heise <[hidden email]> wrote:
Best Regards, Yuval Itzchakov. |
In reply to this post by Arvid Heise-3
Will it be enough to provide you the output of `-verbose:class`? Or do you want me to add additional arguments? On Tue, Aug 25, 2020 at 6:20 PM Arvid Heise <[hidden email]> wrote:
Best Regards, Yuval Itzchakov. |
In reply to this post by Yuval Itzchakov
The NoSuchMethodException shows that
the class is still on the classpath, but with a different version
than your code is expecting. Otherwise you would've gotten a
different error.
This implies that there are 2 versions
of the kafka dependencies on the classpath in your original run;
it suddenly working with parent-first classloading reinforces the
suspicion that they are present in the distribution.
As Arvid mentioned, the classpath log
entry (at the very start of the log file) would be interesting.
Did you build the Flink yourself
distribution, or are you relying on one of the existing Flink
binaries/images?
On 25/08/2020 20:51, Yuval Itzchakov
wrote:
|
I'm afraid it's not being printed out due to different log levels :( Yes, I build the image myself. It takes the tar file from https://archive.apache.org/dist/flink/flink-1.9.0/ and unpacks it into the image. I've ran: find . -iname "*.jar" | xargs -n 1 jar tf | grep -i producerrecord find . -iname "*.jar" | xargs -n 1 jar tf | grep -i kafka Both from within /lib, they both produce no results. On Tue, Aug 25, 2020 at 10:07 PM Chesnay Schepler <[hidden email]> wrote:
Best Regards, Yuval Itzchakov. |
OK, I think I figured it out. It looks like the uber-jar is also being placed under `lib`, which is probably the cause of the problem. Question is, why does it identify it as two different versions? It's exactly the same JAR. On Tue, Aug 25, 2020 at 10:22 PM Yuval Itzchakov <[hidden email]> wrote:
Best Regards, Yuval Itzchakov. |
The simplest answer is that they are in
fact not equal; maybe it is a jar of an older version of your
setup?
Can you give some details on the
NoSuchMethodException? Specifically whether it tries to access
something from the Kafka connector, or from your own user code.
On 25/08/2020 21:27, Yuval Itzchakov
wrote:
|
They are definitely equal, the same JAR is copied in subsequent lines in the Dockerfile. Regarding the NoSuchMethodException, I'll look it up and let you know tomorrow. On Tue, Aug 25, 2020, 22:59 Chesnay Schepler <[hidden email]> wrote:
|
Hi, [hidden email] The issue is that the uber-jar is first loaded with Flink's app classloader (because it's in lib) and then when the application starts, it gets loaded again in the ChildFirstCL and since it's child-first, the class is loaded anyways. What I don't quite understand is why the Kafka class was loaded through the app classloader to begin with. Since Yuval mentioned that it happens only during restoration, I'm suspecting that Flink is not using the correct classloader at some point. Unfortunately, I don't know an easy way to trace the loading for the first loading (we have the stack trace for the second loading but I think it's legit). On Tue, Aug 25, 2020 at 11:24 PM Yuval Itzchakov <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |