Database connection from job

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Database connection from job

Bart Kastermans
I am using the scala api for Flink, and am trying to set up a JDBC
database connection
in my job (on every incoming event I want to query the database to get
some data
to enrich the event).  Because of the serialization and deserialization
of the code as
it is send from the flink master to the flink workers I cannot just open
the connection
in my main method.  Can someone give me a pointer to the lifecycle
methods that
are called by the worker to do local initialization of the job?  I have
not yet been able
to find any references or examples of this in the documentation.

Thanks!

Best,
Bart
Reply | Threaded
Open this post in threaded view
|

Re: Database connection from job

Stefan Richter
Hi,


Best,
Stefan

Am 24.08.2017 um 14:12 schrieb Bart Kastermans <[hidden email]>:

I am using the scala api for Flink, and am trying to set up a JDBC
database connection
in my job (on every incoming event I want to query the database to get
some data
to enrich the event).  Because of the serialization and deserialization
of the code as
it is send from the flink master to the flink workers I cannot just open
the connection
in my main method.  Can someone give me a pointer to the lifecycle
methods that
are called by the worker to do local initialization of the job?  I have
not yet been able
to find any references or examples of this in the documentation.

Thanks!

Best,
Bart

Reply | Threaded
Open this post in threaded view
|

Re: Database connection from job

Aljoscha Krettek
Hi Bart,

I think you might be interested in the (admittedly short) section of the doc about RichFunctions: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#rich-functions

If you make your user function a RichFunction you can implement the lifecycle methods open() and close() that allow you to setup, for example, a database connection that you wan't to reuse for the lifetime of your user function.

Best,
Aljoscha

On 24. Aug 2017, at 17:42, Stefan Richter <[hidden email]> wrote:

Hi,


Best,
Stefan

Am 24.08.2017 um 14:12 schrieb Bart Kastermans <[hidden email]>:

I am using the scala api for Flink, and am trying to set up a JDBC
database connection
in my job (on every incoming event I want to query the database to get
some data
to enrich the event).  Because of the serialization and deserialization
of the code as
it is send from the flink master to the flink workers I cannot just open
the connection
in my main method.  Can someone give me a pointer to the lifecycle
methods that
are called by the worker to do local initialization of the job?  I have
not yet been able
to find any references or examples of this in the documentation.

Thanks!

Best,
Bart