Hello everyone,
coming from the Spring/CDI world, I'm new to Flink and to streaming processing in general and apologizing for the very basic questions. I wrote simple Flink job with all functions inlined in the main method. The main method has some static instance variables like HTTP client and guava cache to avoid same requests to external APIs. Everything works fine in IDE, however after refactoring functions to separate classes I got NotSerializableException, because obviously one cannot inject anything into functions, unless it is serializable and guava cache isn't. Am I right assuming every object needed for a function must be created by function itself? What is the right way to instantiate the objects (constructor or in open method of the function)? Can functions have non-serializable fields at all? What are the best practices to manage any dependencies in Flink programs? Maybe you can also point me to some tutorial for the beginners? Kind regards, Pan
|
In general I'd expect that every class with state that you use in Flink will be serialised, and therefore you should be marking your classes as Serializable and set a serialVersionUID
I have what sounds like a very similar problem to yours. I need to use a non-serializable component in my streaming Flink job, actually it is also Guava cache. I've only got one area of the application that requires this cache, ie the cache is only being called into by one class. In the serializable class that requires access to the cache, I've handled it like I would any non-serializable property. The property that references the cache container class is marked as transient, and a hook into the deserialization process (https://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html) is added and the cache container is initialised here on deserialization On 2018/03/16 11:35:01, Pan Glust <[hidden email]> wrote: > Hello everyone, > > coming from the Spring/CDI world, I'm new to Flink and to streaming > processing in general and apologizing for the very basic questions. I wrote > simple Flink job with all functions inlined in the main method. The main > method has some static instance variables like HTTP client and guava cache > to avoid same requests to external APIs. > > Everything works fine in IDE, however after refactoring functions to > separate classes I got NotSerializableException, because obviously one > cannot inject anything into functions, unless it is serializable and guava > cache isn't. > > Am I right assuming every object needed for a function must be created by > function itself? > What is the right way to instantiate the objects (constructor or in open > method of the function)? > Can functions have non-serializable fields at all? > > What are the best practices to manage any dependencies in Flink programs? > > Maybe you can also point me to some tutorial for the beginners? > > Kind regards, > Pan > |
Hi,
Im having the same problem now. What is your approach now after gaining some experience? Also do you use Spring DI to setup/initialize your jobs/process functions? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi KristoffSC, it would be better if you'd open up a new thread. It's very rare for users to check user lists after 1 year on a regular basis. In general, if you have a cache, you usually don't want to serialize it. So add the cache as a field inside the respective function (rewrite a lambda to an anonymous class) and make the field transient to avoid serialization. Be aware that you usually want to initialize the cache in your open() function (so you need to use a RichXFunction). Best, Arvid On Fri, Dec 6, 2019 at 1:23 PM KristoffSC <[hidden email]> wrote: Hi, |
Hi Arvid Heise-3,
Thanks for your answer. I took this approach. I did not want to start a new thread since I wanted to avoid "subject duplication" :) Regards, Krzysztof -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |