Hi,
I added my custom jar (that includes dependencies on Jackson) to Flink classpath. It seems to be loaded just fine. But when the job starts I am getting an exception below. I am sure how to interpret the exception though and would appreciate it if somebody gives me advice on it. Thanks Alex 2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED. org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265] Caused by: java.lang.ClassCastException: cannot assign instance of java.util.concurrent.ConcurrentHashMap to field com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers of type com.fasterxml.jackson.databind.util.LRUMap in instance of com.fasterxml.jackson.databind.deser.DeserializerCache at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:1.8.0_265] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260) ~[flink-dist_2.11-1.11.2.jar:1.11.2] ... 8 more |
Are you using ObjectMapper as a non-transient field? If so, please make it transient and initialize in open() of a Rich*Function. On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Thanks, Arvid, That is what I thought too. I went through all the instances where it might 've been a member variable and made sure that it's declared as transient :-( Is there anything else I can check? Alex On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise <[hidden email]> wrote:
|
The easiest solution for all non-trivial issues like this is to start the application locally in a local executor, so you can debug in your IDE. Additionally, double-check that you have no lambdas/anonymous classes that reference outer classes with ObjectMapper. ObjectMapper should also be static as it's fully immutable, so you can also check that.On Fri, Nov 20, 2020 at 8:55 PM Alexander Bagerman <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
I just switched from providing my jar while creating a Remote environment to providing this jar on flink's classpath. It used to work just fine when the jar was shipped to Fllink with the job graph. Now when jar is available to flink on the startup the same job that used to run is failing with exception I provided. I suspect that it might be class loader issue but am not sure On Fri, Nov 20, 2020 at 12:10 PM Arvid Heise <[hidden email]> wrote:
|
Oh no, please never put user code (with included libraries) into flink's classpath. It's not supported exactly for classloader reasons. Why did you think that this would be a good approach? Is your jar too big? Maybe a different deployment mode would be more appropriate? [1] Alternatively, if you want to go the hacky route, you could also try to shade your dependencies. On Fri, Nov 20, 2020 at 9:18 PM Alexander Bagerman <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
A couple of reasons I 've done that - it's listed as an option here : https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#advanced-customization under optional libraries - I have over 200 jobs running that rely on the same core functionality provided by the jar in question and it seems somewhat wasteful to do 200 uploads of 80M+ of the same code to the cluster - I was having issues shipping jar content when it was packaged as a part of my "fat" jar application. I was providing a URL to the jar (jar:file:xxxx!/xxx) but flink could not find it there. On Fri, Nov 20, 2020 at 12:29 PM Arvid Heise <[hidden email]> wrote:
|
Hm yes that are good reasons. The issue is that if you put it into Flink, then it's part of the system classloader of Flink, so there is no way to unload classes or protect Flink's classes (+its dependencies) from being overwritten by your dependencies. I'm thinking that this may cause differences in the serialization and deserialization of task configuration. If you want to go hardcore you could list all class files in your jar and in the remaining libs and check for intersections. These intersection should be - excluded from your jar, or - must be the exact same version (but then you could just include it to reduce size), or - you need to shade them within your jar. On Fri, Nov 20, 2020 at 9:36 PM Alexander Bagerman <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |