Basic question about flink programms

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Basic question about flink programms

Pan Glust
Hello everyone,

coming from the Spring/CDI world, I'm new to Flink and to streaming processing in general and apologizing for the very basic questions. I wrote simple Flink job with all functions inlined in the main method. The main method has some static instance variables like HTTP client and guava cache to avoid same requests to external APIs.

Everything works fine in IDE, however after refactoring functions to separate classes I got NotSerializableException, because obviously one cannot inject anything into functions, unless it is serializable and guava cache isn't.

Am I right assuming every object needed for a function must be created by function itself?
What is the right way to instantiate the objects (constructor or in open method of the function)?
Can functions have non-serializable fields at all?

What are the best practices to manage any dependencies in Flink programs?

Maybe you can also point me to some tutorial for the beginners?

Kind regards,
Pan

Reply | Threaded
Open this post in threaded view
|

Re: Basic question about flink programms

dyana.rose@salecycle.com
In general I'd expect that every class with state that you use in Flink will be serialised, and therefore you should be marking your classes as Serializable and set a serialVersionUID

I have what sounds like a very similar problem to yours. I need to use a non-serializable component in my streaming Flink job, actually it is also Guava cache. I've only got one area of the application that requires this cache, ie the cache is only being called into by one class.

In the serializable class that requires access to the cache, I've handled it like I would any non-serializable property. The property that references the cache container class is marked as transient, and a hook into the deserialization process (https://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html) is added and the cache container is initialised here on deserialization

On 2018/03/16 11:35:01, Pan Glust <[hidden email]> wrote:

>  Hello everyone,
>
> coming from the Spring/CDI world, I'm new to Flink and to streaming
> processing in general and apologizing for the very basic questions. I wrote
> simple Flink job with all functions inlined in the main method. The main
> method has some static instance variables like HTTP client and guava cache
> to avoid same requests to external APIs.
>
> Everything works fine in IDE, however after refactoring functions to
> separate classes I got NotSerializableException, because obviously one
> cannot inject anything into functions, unless it is serializable and guava
> cache isn't.
>
> Am I right assuming every object needed for a function must be created by
> function itself?
> What is the right way to instantiate the objects (constructor or in open
> method of the function)?
> Can functions have non-serializable fields at all?
>
> What are the best practices to manage any dependencies in Flink programs?
>
> Maybe you can also point me to some tutorial for the beginners?
>
> Kind regards,
> Pan
>
Reply | Threaded
Open this post in threaded view
|

Re: Basic question about flink programms

KristoffSC
Hi,
Im having the same problem now.  What is your approach now after gaining
some experience?

Also do you use Spring DI to setup/initialize your jobs/process functions?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Basic question about flink programms

Arvid Heise-3
Hi KristoffSC,

it would be better if you'd open up a new thread. It's very rare for users to check user lists after 1 year on a regular basis.

In general, if you have a cache, you usually don't want to serialize it. So add the cache as a field inside the respective function (rewrite a lambda to an anonymous class) and make the field transient to avoid serialization. Be aware that you usually want to initialize the cache in your open() function (so you need to use a RichXFunction).

Best,

Arvid

On Fri, Dec 6, 2019 at 1:23 PM KristoffSC <[hidden email]> wrote:
Hi,
Im having the same problem now.  What is your approach now after gaining
some experience?

Also do you use Spring DI to setup/initialize your jobs/process functions?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Basic question about flink programms

KristoffSC
Hi Arvid Heise-3,
Thanks for your answer. I took this approach.


I did not want to start a new thread since I wanted to avoid "subject
duplication" :)

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/