DB connection and query inside map function

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

DB connection and query inside map function

Anastasios Skarlatidis
Hi! 

I am new to Apache Flink and I would like to ask what is the best way to query a relational DB inside a map function, in order to enrich the streaming data. Consider, for example, that I have a KeyedStream[Int, String] and I would like to query the database based on the Int value inside a map function `stream.map(v: Int => <<some SQL query>> )`. 

Is it possible to have a connection pooler per worker nod,e in order to be used inside each map function call?

Best,

Anastasios
Reply | Threaded
Open this post in threaded view
|

Re: DB connection and query inside map function

Fabian Hueske-2
Hi Anastasios,

that's certainly possible. The most straight-forward approach would be a synchronous call to the database.
Because only one request is active at the same time, you do not need a thread pool.
You can establish the connection in the open() method of a RichMapFunction. The problem with this approach is that the synchronous requests can significantly increase the latency.

Doing the calls asynchronously and using a thread pool is not very easy because this would need to be integrated with Flink's checkpointing mechanism.
In fact, there is an effort to add a special Map operator that supports asynchronous calls (see FLIP-12 [1]).
We expect this to be included in the next minor release, Flink 1.2.

Hope this helps,
Fabian

2016-11-27 22:11 GMT+01:00 Anastasios Skarlatidis <[hidden email]>:
Hi! 

I am new to Apache Flink and I would like to ask what is the best way to query a relational DB inside a map function, in order to enrich the streaming data. Consider, for example, that I have a KeyedStream[Int, String] and I would like to query the database based on the Int value inside a map function `stream.map(v: Int => <<some SQL query>> )`. 

Is it possible to have a connection pooler per worker nod,e in order to be used inside each map function call?

Best,

Anastasios