NPE when using spring bean in custom input format

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

NPE when using spring bean in custom input format

madan
Hi,

Need help in the below scenario,

I have CustomInputFormat which loads the records using a bean,

public class CustomInputFormat extends GenericInputFormat {
      private Iterator<Map<String, Object>> recordsIterator;
      @Override
        public void open(GenericInputSplit split) throws IOException {
               ServiceX serviceX = SpringBeanFinder.getBean(ServiceX.class);
                recordsIterator = serviceX.getRecords(..); 
         }
}
The above input format works fine when using Flink LocalEnvironment in spring application. Problem is when running flink in a cluster mode and trying to connect to it using RemoveEnvironment. Since Spring applicaiton context will not be initialized, NPE is thrown. Please suggest what could be the solution in this scenario.



--
Thank you,
Madan.
Reply | Threaded
Open this post in threaded view
|

Re: NPE when using spring bean in custom input format

madan
Suggestions please.

Thinking of options
1. Initilizing spring application context in the 'open' method. Instead of loading entire context, move service related beans to one/multiple packages  and scan only those packages. Requires code refactoring.
2. Direct database query - direct query cannot be used since business logic is around while fetching records
3. Write initially to csv and do transformation on csv. Last possible option.

Please share your thoughts.

Thank you.

On Wed, Jan 16, 2019 at 2:50 PM madan <[hidden email]> wrote:
Hi,

Need help in the below scenario,

I have CustomInputFormat which loads the records using a bean,

public class CustomInputFormat extends GenericInputFormat {
      private Iterator<Map<String, Object>> recordsIterator;
      @Override
        public void open(GenericInputSplit split) throws IOException {
               ServiceX serviceX = SpringBeanFinder.getBean(ServiceX.class);
                recordsIterator = serviceX.getRecords(..); 
         }
}
The above input format works fine when using Flink LocalEnvironment in spring application. Problem is when running flink in a cluster mode and trying to connect to it using RemoveEnvironment. Since Spring applicaiton context will not be initialized, NPE is thrown. Please suggest what could be the solution in this scenario.



--
Thank you,
Madan.


--
Thank you,
Madan.
Reply | Threaded
Open this post in threaded view
|

Re: NPE when using spring bean in custom input format

Piotr Nowojski-2
Hi,

You have to use `open()` method to handle initialisation of the things required by your code/operators. By the nature of the LocalEnvironment, the life cycle of the operators is different there compared to what happens when submitting a job to the real cluster. With remote environments your classes will be serialised, sent over the network and then deserialised and then `open()` methods will be called. In such setup, if you need to initialise some shared static resource, you also have to keep in mind that depending on the parallelism, number of the tasks & number of task managers, you will have to make sure that your static resource is initialised only once. You also should take care about de-initialisation of this resource & take into account what will happen if your code will crash with an exception. In that case your job might be resubmitted with out restarting the TaskManagers.

Piotrek

On 18 Jan 2019, at 11:48, madan <[hidden email]> wrote:

Suggestions please.

Thinking of options
1. Initilizing spring application context in the 'open' method. Instead of loading entire context, move service related beans to one/multiple packages  and scan only those packages. Requires code refactoring.
2. Direct database query - direct query cannot be used since business logic is around while fetching records
3. Write initially to csv and do transformation on csv. Last possible option.

Please share your thoughts.

Thank you.

On Wed, Jan 16, 2019 at 2:50 PM madan <[hidden email]> wrote:
Hi,

Need help in the below scenario,

I have CustomInputFormat which loads the records using a bean,

public class CustomInputFormat extends GenericInputFormat {
      private Iterator<Map<String, Object>> recordsIterator;
      @Override
        public void open(GenericInputSplit split) throws IOException {
               ServiceX serviceX = SpringBeanFinder.getBean(ServiceX.class);
                recordsIterator = serviceX.getRecords(..); 
         }
}
The above input format works fine when using Flink LocalEnvironment in spring application. Problem is when running flink in a cluster mode and trying to connect to it using RemoveEnvironment. Since Spring applicaiton context will not be initialized, NPE is thrown. Please suggest what could be the solution in this scenario.



--
Thank you,
Madan.


--
Thank you,
Madan.