Dependency injection and flink.

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Dependency injection and flink.

santhosh venkat

Hi,  

I'm trying to integrate a dependency injection framework with flink within my company. When I searched the user-mailing list, I found the following thread in flink which discussed about this in the past: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dependency-Injection-and-Flink-td18880.html

Since the thread was ~2 yrs old, I'm creating this request.

1. How do we expect users to integrate flink with a dependency injection framework. Are there any hooks/entry-points that we can use to seamlessly integrate a DI-fwk with flink? How does the community recommend the dependency injection integration?

2. Would it be possible to create the object(say spring objects) at a flink-task scope ? Or all these objects(say spring) from a dependency injection fwk are expected to be created at an entire process(JM/TM) level?

Can someone please help answer the above questions and help me understand the flink-guarantees better. Any help would be greatly appreciated.

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Dependency injection and flink.

Arvid Heise-3
Hi Santhosh,

Flink does not support automatic DI on task level and there is no immediate plan as of now to support it out-of-the-box. In general, there are quite a few implications of using automatic DI in a distributed setting. For example, how is a singleton supposed to work? Nevertheless, Flink's job startup got overhauled in the last and the upcoming release, so it might be easier to support DI frameworks in the near future.

What I usually recommend is to use automatic DI while creating the DataStream application and then switch to manual DI on task manager level (most folks confuse DI with automatic DI, but DI is a general pattern that is independent of any framework).

Here is an example. Suppose you want to use ServiceA in some asyncIO call.

DataStream<Integer> inputStream = env.addSource(...);
AsyncFunction<Integer, String> function = new ExternalLookupFunction();
AsyncDataStream.unorderedWait(inputStream, function, 1, TimeUnit.SECONDS).print();

class ExternalLookupFunction extends AsyncFunction<Integer, String> {
@Autowired
ServiceA service; // <-- will be injected wherever the DataStream graph is created

@Override
public void asyncInvoke(Integer input, ResultFuture<String> resultFuture) throws IOException {
service.call(input, resultFuture::complete); // <-- called only on task manager
}
}

Now the question is how ServiceA is transferred from client/job manager to task manager. One solution is to make ServiceA Serializable and just let Java Serialization handle everything automatically. Alternatively, you can only serialize the configuration information and create the service on RichAsyncFunction#open.

Let's see if someone else made progress on providing the initialization hooks as described in your linked thread. Note that the community is busy getting Flink 1.12 done, so it might take a while for more answers.

Best,

Arvid

On Tue, Nov 3, 2020 at 12:03 AM santhosh venkat <[hidden email]> wrote:

Hi,  

I'm trying to integrate a dependency injection framework with flink within my company. When I searched the user-mailing list, I found the following thread in flink which discussed about this in the past: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dependency-Injection-and-Flink-td18880.html

Since the thread was ~2 yrs old, I'm creating this request.

1. How do we expect users to integrate flink with a dependency injection framework. Are there any hooks/entry-points that we can use to seamlessly integrate a DI-fwk with flink? How does the community recommend the dependency injection integration?

2. Would it be possible to create the object(say spring objects) at a flink-task scope ? Or all these objects(say spring) from a dependency injection fwk are expected to be created at an entire process(JM/TM) level?

Can someone please help answer the above questions and help me understand the flink-guarantees better. Any help would be greatly appreciated.

Thanks.


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Dependency injection and flink.

Dan Diephouse
Just want to chime in here that it would be fantastic to have a way to DI in Flink. Ideally the injected services themselves don't get serialized at all since they're just singletons in our case. E.g. we have an API client that looks up data from our API and caches it for all the functions that need it.

On Tue, Nov 3, 2020 at 12:32 AM Arvid Heise <[hidden email]> wrote:
Hi Santhosh,

Flink does not support automatic DI on task level and there is no immediate plan as of now to support it out-of-the-box. In general, there are quite a few implications of using automatic DI in a distributed setting. For example, how is a singleton supposed to work? Nevertheless, Flink's job startup got overhauled in the last and the upcoming release, so it might be easier to support DI frameworks in the near future.

What I usually recommend is to use automatic DI while creating the DataStream application and then switch to manual DI on task manager level (most folks confuse DI with automatic DI, but DI is a general pattern that is independent of any framework).

Here is an example. Suppose you want to use ServiceA in some asyncIO call.

DataStream<Integer> inputStream = env.addSource(...);
AsyncFunction<Integer, String> function = new ExternalLookupFunction();
AsyncDataStream.unorderedWait(inputStream, function, 1, TimeUnit.SECONDS).print();

class ExternalLookupFunction extends AsyncFunction<Integer, String> {
@Autowired
ServiceA service; // <-- will be injected wherever the DataStream graph is created

@Override
public void asyncInvoke(Integer input, ResultFuture<String> resultFuture) throws IOException {
service.call(input, resultFuture::complete); // <-- called only on task manager
}
}

Now the question is how ServiceA is transferred from client/job manager to task manager. One solution is to make ServiceA Serializable and just let Java Serialization handle everything automatically. Alternatively, you can only serialize the configuration information and create the service on RichAsyncFunction#open.

Let's see if someone else made progress on providing the initialization hooks as described in your linked thread. Note that the community is busy getting Flink 1.12 done, so it might take a while for more answers.

Best,

Arvid

On Tue, Nov 3, 2020 at 12:03 AM santhosh venkat <[hidden email]> wrote:

Hi,  

I'm trying to integrate a dependency injection framework with flink within my company. When I searched the user-mailing list, I found the following thread in flink which discussed about this in the past: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dependency-Injection-and-Flink-td18880.html

Since the thread was ~2 yrs old, I'm creating this request.

1. How do we expect users to integrate flink with a dependency injection framework. Are there any hooks/entry-points that we can use to seamlessly integrate a DI-fwk with flink? How does the community recommend the dependency injection integration?

2. Would it be possible to create the object(say spring objects) at a flink-task scope ? Or all these objects(say spring) from a dependency injection fwk are expected to be created at an entire process(JM/TM) level?

Can someone please help answer the above questions and help me understand the flink-guarantees better. Any help would be greatly appreciated.

Thanks.


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Dan Diephouse
@dandiep
Reply | Threaded
Open this post in threaded view
|

Re: Dependency injection and flink.

Arvid Heise-3
In reply to this post by Arvid Heise-3
I hope you don't mind that I'm just lazily giving you a link to wikipedia [1]. The first few examples all show manual DI with ctor, setters, etc.

Folks usually only think of assembling (automatic) DI when talking about DI but you can build everything manually with a bit of inconvenience as well. What usually happens if that your constructors have a huge parameter list where all the injected dependencies are passed (potentially further down the object graph).

That's also what we have in the Flink code base (although it would be better to replace long parameter lists with parameter objects but that's a different story). For example, consider DataStream, where we inject the environment [2].


On Mon, Nov 9, 2020 at 9:07 PM santhosh venkat <[hidden email]> wrote:
Hi Arvid, 

Thanks a lot for your response. If I understand correctly, then we do not support task level DI in flink. I completely agree with the reasons that you'd provided(especially with singleton). Also, the serialization context is also something I currently do in my app (other frameworks like samza expects something similar from applications too) . 

Only thing I don't understand is what you mean by "use automatic DI while creating the DataStream application and then switch to manual DI on task manager level.". I don't quite follow the distinction between automatic and manual parts.  Can you please help me understand what you mean by that. 

Thanks.

On Tue, Nov 3, 2020 at 12:31 AM Arvid Heise <[hidden email]> wrote:
Hi Santhosh,

Flink does not support automatic DI on task level and there is no immediate plan as of now to support it out-of-the-box. In general, there are quite a few implications of using automatic DI in a distributed setting. For example, how is a singleton supposed to work? Nevertheless, Flink's job startup got overhauled in the last and the upcoming release, so it might be easier to support DI frameworks in the near future.

What I usually recommend is to use automatic DI while creating the DataStream application and then switch to manual DI on task manager level (most folks confuse DI with automatic DI, but DI is a general pattern that is independent of any framework).

Here is an example. Suppose you want to use ServiceA in some asyncIO call.

DataStream<Integer> inputStream = env.addSource(...);
AsyncFunction<Integer, String> function = new ExternalLookupFunction();
AsyncDataStream.unorderedWait(inputStream, function, 1, TimeUnit.SECONDS).print();

class ExternalLookupFunction extends AsyncFunction<Integer, String> {
@Autowired
ServiceA service; // <-- will be injected wherever the DataStream graph is created

@Override
public void asyncInvoke(Integer input, ResultFuture<String> resultFuture) throws IOException {
service.call(input, resultFuture::complete); // <-- called only on task manager
}
}

Now the question is how ServiceA is transferred from client/job manager to task manager. One solution is to make ServiceA Serializable and just let Java Serialization handle everything automatically. Alternatively, you can only serialize the configuration information and create the service on RichAsyncFunction#open.

Let's see if someone else made progress on providing the initialization hooks as described in your linked thread. Note that the community is busy getting Flink 1.12 done, so it might take a while for more answers.

Best,

Arvid

On Tue, Nov 3, 2020 at 12:03 AM santhosh venkat <[hidden email]> wrote:

Hi,  

I'm trying to integrate a dependency injection framework with flink within my company. When I searched the user-mailing list, I found the following thread in flink which discussed about this in the past: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dependency-Injection-and-Flink-td18880.html

Since the thread was ~2 yrs old, I'm creating this request.

1. How do we expect users to integrate flink with a dependency injection framework. Are there any hooks/entry-points that we can use to seamlessly integrate a DI-fwk with flink? How does the community recommend the dependency injection integration?

2. Would it be possible to create the object(say spring objects) at a flink-task scope ? Or all these objects(say spring) from a dependency injection fwk are expected to be created at an entire process(JM/TM) level?

Can someone please help answer the above questions and help me understand the flink-guarantees better. Any help would be greatly appreciated.

Thanks.


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng