Dependency Injection and Flink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Dependency Injection and Flink

XiaoChuan Yu
Hi,

I'm evaluating Flink with the intent to integrate it into a Java project that uses a lot of dependency injection via Guice. What would be the best way to work with DI/Guice given that injected fields aren't Serializable?
I looked at this StackOverflow answer so far. To my understanding the strategy is as follows but I'm not sure about step 3:
  1. Use a RichFunction any time injection required.
  2. Do not use @Inject, instead mark each injected field as transient.
  3. Implement open() / close() and manually assign values to injected fields using Injector.getInstance(SomeClass.class)? But where do I get the injector? Create one on the spot each time? Keep one as a static var somewhere and use everywhere?
Example:
 public class MyFilter extends FilterFunction<String> {
     private transient DbClient dbClient;
     //@Inject DbClient dbClient; //typical Guice field injection
     
     public void open(Configuration parameters) {
         // where am I suppose to get the injector?
         // keep it as a static variable somewhere and init it in Main?
         this.dbClient = MyInjectorHolder.injector().getInstance(DbClient.class);
     }     
     public boolean filter(String value) {
         return this.dbClient.query(value);
     }
 }
I haven't setup a Flink environment to try the above yet though.
Does anyone know of a less verbose way?
I imagine this could get quite verbose with multiple injected fields.

Thanks,
Xiaochuan Yu

Reply | Threaded
Open this post in threaded view
|

Re: Dependency Injection and Flink

Steven Wu
Xiaochuan,

We are doing exactly as you described. We keep the injector as a global static var. 

But we extend from FlinkJobManager and FlinkTaskManager to override main method and initialize the injector (and other things) during JVM startup, which does cause tight code coupling. It is a little painful to upgrade Flink because sometimes internal code structure change of FlinkJobManager and FlinkTaskManager can break our extended class..

Thanks,
Steven


On Tue, Mar 13, 2018 at 11:30 AM, XiaoChuan Yu <[hidden email]> wrote:
Hi,

I'm evaluating Flink with the intent to integrate it into a Java project that uses a lot of dependency injection via Guice. What would be the best way to work with DI/Guice given that injected fields aren't Serializable?
I looked at this StackOverflow answer so far. To my understanding the strategy is as follows but I'm not sure about step 3:
  1. Use a RichFunction any time injection required.
  2. Do not use @Inject, instead mark each injected field as transient.
  3. Implement open() / close() and manually assign values to injected fields using Injector.getInstance(SomeClass.class)? But where do I get the injector? Create one on the spot each time? Keep one as a static var somewhere and use everywhere?
Example:
 public class MyFilter extends FilterFunction<String> {
     private transient DbClient dbClient;
     //@Inject DbClient dbClient; //typical Guice field injection
     
     public void open(Configuration parameters) {
         // where am I suppose to get the injector?
         // keep it as a static variable somewhere and init it in Main?
         this.dbClient = MyInjectorHolder.injector().getInstance(DbClient.class);
     }     
     public boolean filter(String value) {
         return this.dbClient.query(value);
     }
 }
I haven't setup a Flink environment to try the above yet though.
Does anyone know of a less verbose way?
I imagine this could get quite verbose with multiple injected fields.

Thanks,
Xiaochuan Yu


Reply | Threaded
Open this post in threaded view
|

Re: Dependency Injection and Flink

Stephan Ewen
Would it help to be able to register "initializers", meaning some classes/methods that will be called at every process entry point, to set up something like this?


On Tue, Mar 13, 2018 at 7:56 PM, Steven Wu <[hidden email]> wrote:
Xiaochuan,

We are doing exactly as you described. We keep the injector as a global static var. 

But we extend from FlinkJobManager and FlinkTaskManager to override main method and initialize the injector (and other things) during JVM startup, which does cause tight code coupling. It is a little painful to upgrade Flink because sometimes internal code structure change of FlinkJobManager and FlinkTaskManager can break our extended class..

Thanks,
Steven


On Tue, Mar 13, 2018 at 11:30 AM, XiaoChuan Yu <[hidden email]> wrote:
Hi,

I'm evaluating Flink with the intent to integrate it into a Java project that uses a lot of dependency injection via Guice. What would be the best way to work with DI/Guice given that injected fields aren't Serializable?
I looked at this StackOverflow answer so far. To my understanding the strategy is as follows but I'm not sure about step 3:
  1. Use a RichFunction any time injection required.
  2. Do not use @Inject, instead mark each injected field as transient.
  3. Implement open() / close() and manually assign values to injected fields using Injector.getInstance(SomeClass.class)? But where do I get the injector? Create one on the spot each time? Keep one as a static var somewhere and use everywhere?
Example:
 public class MyFilter extends FilterFunction<String> {
     private transient DbClient dbClient;
     //@Inject DbClient dbClient; //typical Guice field injection
     
     public void open(Configuration parameters) {
         // where am I suppose to get the injector?
         // keep it as a static variable somewhere and init it in Main?
         this.dbClient = MyInjectorHolder.injector().getInstance(DbClient.class);
     }     
     public boolean filter(String value) {
         return this.dbClient.query(value);
     }
 }
I haven't setup a Flink environment to try the above yet though.
Does anyone know of a less verbose way?
I imagine this could get quite verbose with multiple injected fields.

Thanks,
Xiaochuan Yu



Reply | Threaded
Open this post in threaded view
|

Re: Dependency Injection and Flink

Steven Wu
Stephan, 

That would be helpful. On job manager side, entry class provides such an entry point hook. The problem is on the task manager side, where we don't have such an initialization/entry point.

I have brought up the same question 3 months ago in this list with subject "entrypoint for executing job in task manager".

Thanks,
Steven



On Thu, Mar 15, 2018 at 1:49 PM, Stephan Ewen <[hidden email]> wrote:
Would it help to be able to register "initializers", meaning some classes/methods that will be called at every process entry point, to set up something like this?


On Tue, Mar 13, 2018 at 7:56 PM, Steven Wu <[hidden email]> wrote:
Xiaochuan,

We are doing exactly as you described. We keep the injector as a global static var. 

But we extend from FlinkJobManager and FlinkTaskManager to override main method and initialize the injector (and other things) during JVM startup, which does cause tight code coupling. It is a little painful to upgrade Flink because sometimes internal code structure change of FlinkJobManager and FlinkTaskManager can break our extended class..

Thanks,
Steven


On Tue, Mar 13, 2018 at 11:30 AM, XiaoChuan Yu <[hidden email]> wrote:
Hi,

I'm evaluating Flink with the intent to integrate it into a Java project that uses a lot of dependency injection via Guice. What would be the best way to work with DI/Guice given that injected fields aren't Serializable?
I looked at this StackOverflow answer so far. To my understanding the strategy is as follows but I'm not sure about step 3:
  1. Use a RichFunction any time injection required.
  2. Do not use @Inject, instead mark each injected field as transient.
  3. Implement open() / close() and manually assign values to injected fields using Injector.getInstance(SomeClass.class)? But where do I get the injector? Create one on the spot each time? Keep one as a static var somewhere and use everywhere?
Example:
 public class MyFilter extends FilterFunction<String> {
     private transient DbClient dbClient;
     //@Inject DbClient dbClient; //typical Guice field injection
     
     public void open(Configuration parameters) {
         // where am I suppose to get the injector?
         // keep it as a static variable somewhere and init it in Main?
         this.dbClient = MyInjectorHolder.injector().getInstance(DbClient.class);
     }     
     public boolean filter(String value) {
         return this.dbClient.query(value);
     }
 }
I haven't setup a Flink environment to try the above yet though.
Does anyone know of a less verbose way?
I imagine this could get quite verbose with multiple injected fields.

Thanks,
Xiaochuan Yu