How to handle JDBC connections in a topology

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to handle JDBC connections in a topology

Stephen Connolly
Hi,

So we have a number of nodes in our topology that need to do things like checking a database, e.g.

* We need a filter step to drop events on the floor from systems we are no longer interested in
* We need a step that outputs on a side-channel if the event is for an object where the parent is not currently known to the database.

Right now we are grabbing a JDBC connection for each node in the topology that needs to talk to the database and storing the connection in a transient field (to exclude it from the serialized state)

What I'd really like to do is have a JDBC connection pool shared across the entire topology as that way we could have the pool check for stale connections, etc.

Does anyone have any tips for doing this kind of thing?

(My current idea is to maintain a `static final WeakHashMap<ClassLoader,ConnectionPool>` in the main class... but that feels very much like a hack)

What I'm really looking for is some form of Node Transient State... are there any examples of this type of think.

Flink 1.8.x

Thanks,

-Stephen
Reply | Threaded
Open this post in threaded view
|

Re: How to handle JDBC connections in a topology

Stephen Connolly
Oh and I'd also need some way to clean up the per-node transient state if the topology stops running on a specific node.

On Wed, 24 Jul 2019 at 08:18, Stephen Connolly <[hidden email]> wrote:
Hi,

So we have a number of nodes in our topology that need to do things like checking a database, e.g.

* We need a filter step to drop events on the floor from systems we are no longer interested in
* We need a step that outputs on a side-channel if the event is for an object where the parent is not currently known to the database.

Right now we are grabbing a JDBC connection for each node in the topology that needs to talk to the database and storing the connection in a transient field (to exclude it from the serialized state)

What I'd really like to do is have a JDBC connection pool shared across the entire topology as that way we could have the pool check for stale connections, etc.

Does anyone have any tips for doing this kind of thing?

(My current idea is to maintain a `static final WeakHashMap<ClassLoader,ConnectionPool>` in the main class... but that feels very much like a hack)

What I'm really looking for is some form of Node Transient State... are there any examples of this type of think.

Flink 1.8.x

Thanks,

-Stephen
Reply | Threaded
Open this post in threaded view
|

Re:Re: How to handle JDBC connections in a topology

Haibo Sun
Hi Stephen,

I don't think it's possible to use the same connection pool for the entire topology, because the nodes on the topology may run in different JVMs and on different machines.

If you want all operators running in the same JVM to use the same connection pool, I think you can implement a static class that contains the connection pool, and then the operators get the  connection from it.

Best,
Haibo

At 2019-07-24 15:20:31, "Stephen Connolly" <[hidden email]> wrote:
Oh and I'd also need some way to clean up the per-node transient state if the topology stops running on a specific node.

On Wed, 24 Jul 2019 at 08:18, Stephen Connolly <[hidden email]> wrote:
Hi,

So we have a number of nodes in our topology that need to do things like checking a database, e.g.

* We need a filter step to drop events on the floor from systems we are no longer interested in
* We need a step that outputs on a side-channel if the event is for an object where the parent is not currently known to the database.

Right now we are grabbing a JDBC connection for each node in the topology that needs to talk to the database and storing the connection in a transient field (to exclude it from the serialized state)

What I'd really like to do is have a JDBC connection pool shared across the entire topology as that way we could have the pool check for stale connections, etc.

Does anyone have any tips for doing this kind of thing?

(My current idea is to maintain a `static final WeakHashMap<ClassLoader,ConnectionPool>` in the main class... but that feels very much like a hack)

What I'm really looking for is some form of Node Transient State... are there any examples of this type of think.

Flink 1.8.x

Thanks,

-Stephen
Reply | Threaded
Open this post in threaded view
|

Re: How to handle JDBC connections in a topology

Chesnay Schepler

Note that in order for the static class approach to work you have to ensure that the class is loaded by the parent classloader, either by placing the class in /lib or configuring `classloader.parent-first-patterns-additional` to pick up this particular class.

On 24/07/2019 10:24, Haibo Sun wrote:
Hi Stephen,

I don't think it's possible to use the same connection pool for the entire topology, because the nodes on the topology may run in different JVMs and on different machines.

If you want all operators running in the same JVM to use the same connection pool, I think you can implement a static class that contains the connection pool, and then the operators get theĀ  connection from it.

Best,
Haibo

At 2019-07-24 15:20:31, "Stephen Connolly" [hidden email] wrote:
Oh and I'd also need some way to clean up the per-node transient state if the topology stops running on a specific node.

On Wed, 24 Jul 2019 at 08:18, Stephen Connolly <[hidden email]> wrote:
Hi,

So we have a number of nodes in our topology that need to do things like checking a database, e.g.

* We need a filter step to drop events on the floor from systems we are no longer interested in
* We need a step that outputs on a side-channel if the event is for an object where the parent is not currently known to the database.

Right now we are grabbing a JDBC connection for each node in the topology that needs to talk to the database and storing the connection in a transient field (to exclude it from the serialized state)

What I'd really like to do is have a JDBC connection pool shared across the entire topology as that way we could have the pool check for stale connections, etc.

Does anyone have any tips for doing this kind of thing?

(My current idea is to maintain a `static final WeakHashMap<ClassLoader,ConnectionPool>` in the main class... but that feels very much like a hack)

What I'm really looking for is some form of Node Transient State... are there any examples of this type of think.

Flink 1.8.x

Thanks,

-Stephen