Jackson de/serialization exception?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Jackson de/serialization exception?

Alexander Bagerman
Hi,
I added my custom jar (that includes dependencies on Jackson) to Flink classpath. It seems to be loaded just fine. But when the job starts I am getting an exception below. I am sure how to interpret the exception though and would appreciate it if somebody gives me advice on it.
Thanks
Alex

2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.lang.ClassCastException: cannot assign instance of java.util.concurrent.ConcurrentHashMap to field com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers of type com.fasterxml.jackson.databind.util.LRUMap in instance of com.fasterxml.jackson.databind.deser.DeserializerCache
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:1.8.0_265]
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) ~[?:1.8.0_265]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    ... 8 more
Reply | Threaded
Open this post in threaded view
|

Re: Jackson de/serialization exception?

Arvid Heise-3
Are you using ObjectMapper as a non-transient field? If so, please make it transient and initialize in open() of a Rich*Function.

On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman <[hidden email]> wrote:
Hi,
I added my custom jar (that includes dependencies on Jackson) to Flink classpath. It seems to be loaded just fine. But when the job starts I am getting an exception below. I am sure how to interpret the exception though and would appreciate it if somebody gives me advice on it.
Thanks
Alex

2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.lang.ClassCastException: cannot assign instance of java.util.concurrent.ConcurrentHashMap to field com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers of type com.fasterxml.jackson.databind.util.LRUMap in instance of com.fasterxml.jackson.databind.deser.DeserializerCache
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:1.8.0_265]
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) ~[?:1.8.0_265]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    ... 8 more


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Jackson de/serialization exception?

Alexander Bagerman
Thanks, Arvid,
That is what I thought too. I went through all the instances where it might 've been a member variable and made sure that it's declared as transient :-( Is there anything else I can check?
Alex

On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise <[hidden email]> wrote:
Are you using ObjectMapper as a non-transient field? If so, please make it transient and initialize in open() of a Rich*Function.

On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman <[hidden email]> wrote:
Hi,
I added my custom jar (that includes dependencies on Jackson) to Flink classpath. It seems to be loaded just fine. But when the job starts I am getting an exception below. I am sure how to interpret the exception though and would appreciate it if somebody gives me advice on it.
Thanks
Alex

2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.lang.ClassCastException: cannot assign instance of java.util.concurrent.ConcurrentHashMap to field com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers of type com.fasterxml.jackson.databind.util.LRUMap in instance of com.fasterxml.jackson.databind.deser.DeserializerCache
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:1.8.0_265]
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) ~[?:1.8.0_265]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    ... 8 more


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Jackson de/serialization exception?

Arvid Heise-3
The easiest solution for all non-trivial issues like this is to start the application locally in a local executor, so you can debug in your IDE.

Additionally, double-check that you have no lambdas/anonymous classes that reference outer classes with ObjectMapper. ObjectMapper should also be static as it's fully immutable, so you can also check that.

On Fri, Nov 20, 2020 at 8:55 PM Alexander Bagerman <[hidden email]> wrote:
Thanks, Arvid,
That is what I thought too. I went through all the instances where it might 've been a member variable and made sure that it's declared as transient :-( Is there anything else I can check?
Alex

On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise <[hidden email]> wrote:
Are you using ObjectMapper as a non-transient field? If so, please make it transient and initialize in open() of a Rich*Function.

On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman <[hidden email]> wrote:
Hi,
I added my custom jar (that includes dependencies on Jackson) to Flink classpath. It seems to be loaded just fine. But when the job starts I am getting an exception below. I am sure how to interpret the exception though and would appreciate it if somebody gives me advice on it.
Thanks
Alex

2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.lang.ClassCastException: cannot assign instance of java.util.concurrent.ConcurrentHashMap to field com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers of type com.fasterxml.jackson.databind.util.LRUMap in instance of com.fasterxml.jackson.databind.deser.DeserializerCache
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:1.8.0_265]
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) ~[?:1.8.0_265]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    ... 8 more


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Jackson de/serialization exception?

Alexander Bagerman
I just switched from providing my jar while creating a Remote environment to providing this jar on flink's classpath. It used to work just fine when the jar was shipped to Fllink with the job graph. Now when jar is available to flink on the startup the same job that used to run is failing with exception I provided. I suspect that it might be class loader issue but am not sure 

On Fri, Nov 20, 2020 at 12:10 PM Arvid Heise <[hidden email]> wrote:
The easiest solution for all non-trivial issues like this is to start the application locally in a local executor, so you can debug in your IDE.

Additionally, double-check that you have no lambdas/anonymous classes that reference outer classes with ObjectMapper. ObjectMapper should also be static as it's fully immutable, so you can also check that.

On Fri, Nov 20, 2020 at 8:55 PM Alexander Bagerman <[hidden email]> wrote:
Thanks, Arvid,
That is what I thought too. I went through all the instances where it might 've been a member variable and made sure that it's declared as transient :-( Is there anything else I can check?
Alex

On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise <[hidden email]> wrote:
Are you using ObjectMapper as a non-transient field? If so, please make it transient and initialize in open() of a Rich*Function.

On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman <[hidden email]> wrote:
Hi,
I added my custom jar (that includes dependencies on Jackson) to Flink classpath. It seems to be loaded just fine. But when the job starts I am getting an exception below. I am sure how to interpret the exception though and would appreciate it if somebody gives me advice on it.
Thanks
Alex

2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.lang.ClassCastException: cannot assign instance of java.util.concurrent.ConcurrentHashMap to field com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers of type com.fasterxml.jackson.databind.util.LRUMap in instance of com.fasterxml.jackson.databind.deser.DeserializerCache
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:1.8.0_265]
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) ~[?:1.8.0_265]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    ... 8 more


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Jackson de/serialization exception?

Arvid Heise-3
Oh no, please never put user code (with included libraries) into flink's classpath. It's not supported exactly for classloader reasons. Why did you think that this would be a good approach? Is your jar too big?

Maybe a different deployment mode would be more appropriate? [1]

Alternatively, if you want to go the hacky route, you could also try to shade your dependencies.


On Fri, Nov 20, 2020 at 9:18 PM Alexander Bagerman <[hidden email]> wrote:
I just switched from providing my jar while creating a Remote environment to providing this jar on flink's classpath. It used to work just fine when the jar was shipped to Fllink with the job graph. Now when jar is available to flink on the startup the same job that used to run is failing with exception I provided. I suspect that it might be class loader issue but am not sure 

On Fri, Nov 20, 2020 at 12:10 PM Arvid Heise <[hidden email]> wrote:
The easiest solution for all non-trivial issues like this is to start the application locally in a local executor, so you can debug in your IDE.

Additionally, double-check that you have no lambdas/anonymous classes that reference outer classes with ObjectMapper. ObjectMapper should also be static as it's fully immutable, so you can also check that.

On Fri, Nov 20, 2020 at 8:55 PM Alexander Bagerman <[hidden email]> wrote:
Thanks, Arvid,
That is what I thought too. I went through all the instances where it might 've been a member variable and made sure that it's declared as transient :-( Is there anything else I can check?
Alex

On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise <[hidden email]> wrote:
Are you using ObjectMapper as a non-transient field? If so, please make it transient and initialize in open() of a Rich*Function.

On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman <[hidden email]> wrote:
Hi,
I added my custom jar (that includes dependencies on Jackson) to Flink classpath. It seems to be loaded just fine. But when the job starts I am getting an exception below. I am sure how to interpret the exception though and would appreciate it if somebody gives me advice on it.
Thanks
Alex

2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.lang.ClassCastException: cannot assign instance of java.util.concurrent.ConcurrentHashMap to field com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers of type com.fasterxml.jackson.databind.util.LRUMap in instance of com.fasterxml.jackson.databind.deser.DeserializerCache
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:1.8.0_265]
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) ~[?:1.8.0_265]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    ... 8 more


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Jackson de/serialization exception?

Alexander Bagerman
A couple of reasons I 've done that
- I have over 200 jobs running that rely on the same core functionality provided by the jar in question and it seems somewhat wasteful to do 200 uploads of 80M+ of the same code to the cluster
- I was having issues shipping jar content when it was packaged as a part of my "fat" jar application. I was providing a URL to the jar (jar:file:xxxx!/xxx) but flink could not find it there. 


On Fri, Nov 20, 2020 at 12:29 PM Arvid Heise <[hidden email]> wrote:
Oh no, please never put user code (with included libraries) into flink's classpath. It's not supported exactly for classloader reasons. Why did you think that this would be a good approach? Is your jar too big?

Maybe a different deployment mode would be more appropriate? [1]

Alternatively, if you want to go the hacky route, you could also try to shade your dependencies.


On Fri, Nov 20, 2020 at 9:18 PM Alexander Bagerman <[hidden email]> wrote:
I just switched from providing my jar while creating a Remote environment to providing this jar on flink's classpath. It used to work just fine when the jar was shipped to Fllink with the job graph. Now when jar is available to flink on the startup the same job that used to run is failing with exception I provided. I suspect that it might be class loader issue but am not sure 

On Fri, Nov 20, 2020 at 12:10 PM Arvid Heise <[hidden email]> wrote:
The easiest solution for all non-trivial issues like this is to start the application locally in a local executor, so you can debug in your IDE.

Additionally, double-check that you have no lambdas/anonymous classes that reference outer classes with ObjectMapper. ObjectMapper should also be static as it's fully immutable, so you can also check that.

On Fri, Nov 20, 2020 at 8:55 PM Alexander Bagerman <[hidden email]> wrote:
Thanks, Arvid,
That is what I thought too. I went through all the instances where it might 've been a member variable and made sure that it's declared as transient :-( Is there anything else I can check?
Alex

On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise <[hidden email]> wrote:
Are you using ObjectMapper as a non-transient field? If so, please make it transient and initialize in open() of a Rich*Function.

On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman <[hidden email]> wrote:
Hi,
I added my custom jar (that includes dependencies on Jackson) to Flink classpath. It seems to be loaded just fine. But when the job starts I am getting an exception below. I am sure how to interpret the exception though and would appreciate it if somebody gives me advice on it.
Thanks
Alex

2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.lang.ClassCastException: cannot assign instance of java.util.concurrent.ConcurrentHashMap to field com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers of type com.fasterxml.jackson.databind.util.LRUMap in instance of com.fasterxml.jackson.databind.deser.DeserializerCache
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:1.8.0_265]
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) ~[?:1.8.0_265]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    ... 8 more


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Jackson de/serialization exception?

Arvid Heise-3
Hm yes that are good reasons. The issue is that if you put it into Flink, then it's part of the system classloader of Flink, so there is no way to unload classes or protect Flink's classes (+its dependencies) from being overwritten by your dependencies. I'm thinking that this may cause differences in the serialization and deserialization of task configuration.

If you want to go hardcore you could list all class files in your jar and in the remaining libs and check for intersections. These intersection should be
- excluded from your jar, or
- must be the exact same version (but then you could just include it to reduce size), or
- you need to shade them within your jar.

On Fri, Nov 20, 2020 at 9:36 PM Alexander Bagerman <[hidden email]> wrote:
A couple of reasons I 've done that
- I have over 200 jobs running that rely on the same core functionality provided by the jar in question and it seems somewhat wasteful to do 200 uploads of 80M+ of the same code to the cluster
- I was having issues shipping jar content when it was packaged as a part of my "fat" jar application. I was providing a URL to the jar (jar:file:xxxx!/xxx) but flink could not find it there. 


On Fri, Nov 20, 2020 at 12:29 PM Arvid Heise <[hidden email]> wrote:
Oh no, please never put user code (with included libraries) into flink's classpath. It's not supported exactly for classloader reasons. Why did you think that this would be a good approach? Is your jar too big?

Maybe a different deployment mode would be more appropriate? [1]

Alternatively, if you want to go the hacky route, you could also try to shade your dependencies.


On Fri, Nov 20, 2020 at 9:18 PM Alexander Bagerman <[hidden email]> wrote:
I just switched from providing my jar while creating a Remote environment to providing this jar on flink's classpath. It used to work just fine when the jar was shipped to Fllink with the job graph. Now when jar is available to flink on the startup the same job that used to run is failing with exception I provided. I suspect that it might be class loader issue but am not sure 

On Fri, Nov 20, 2020 at 12:10 PM Arvid Heise <[hidden email]> wrote:
The easiest solution for all non-trivial issues like this is to start the application locally in a local executor, so you can debug in your IDE.

Additionally, double-check that you have no lambdas/anonymous classes that reference outer classes with ObjectMapper. ObjectMapper should also be static as it's fully immutable, so you can also check that.

On Fri, Nov 20, 2020 at 8:55 PM Alexander Bagerman <[hidden email]> wrote:
Thanks, Arvid,
That is what I thought too. I went through all the instances where it might 've been a member variable and made sure that it's declared as transient :-( Is there anything else I can check?
Alex

On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise <[hidden email]> wrote:
Are you using ObjectMapper as a non-transient field? If so, please make it transient and initialize in open() of a Rich*Function.

On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman <[hidden email]> wrote:
Hi,
I added my custom jar (that includes dependencies on Jackson) to Flink classpath. It seems to be loaded just fine. But when the job starts I am getting an exception below. I am sure how to interpret the exception though and would appreciate it if somebody gives me advice on it.
Thanks
Alex

2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.lang.ClassCastException: cannot assign instance of java.util.concurrent.ConcurrentHashMap to field com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers of type com.fasterxml.jackson.databind.util.LRUMap in instance of com.fasterxml.jackson.databind.deser.DeserializerCache
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:1.8.0_265]
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) ~[?:1.8.0_265]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) ~[?:1.8.0_265]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
    ... 8 more


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng