Calling external services/databases from DataStream API

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Calling external services/databases from DataStream API

Diego Fustes Villadóniga

Hi all,

 

I’m working on an application that enriches network connections with Geolocations, using the GeoIP database, which is stored as a file in HDFS. To do so, I map every connection in my stream, using this function:

 

def enrichIp(ip: String): Location = {

   
val location = service.getLocation(ip)


   Location(location.
countryName, Some(location.city), Some(latlon))
}

 

The variable service is a variable of type LookupService, declared this way:

 

private lazy val service = new LookupService(Paths.get(path).toFile, LookupService.GEOIP_MEMORY_CACHE)
 

I can see several problems in this architecture. First, I need to copy manually the file from HDFS to the local filesytem prior to start the streaming. This would be solved with something like a DistributedCache, but it is not available in the DataStream API, or at least I can’t see it.

 

Furthermore, I need to load the full GeoIp database into memory for every Flink task. This might be OK in terms of performance, but the memory consumption is quite high. The other alternative that I see is to load the GeoIP file into an external database (Redis, Postgres, etc) and query it in real time, which might be also a s good solution… Is there any built-in mechanism to do this?

 

 

Kind regards,

 

Diego  

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Calling external services/databases from DataStream API

Jonas Gröger
This post was updated on .
I have a similar usecase where I (for the purposes of this discussion) have a GeoIP Database that is not fully available from the start but will eventually be "full". The GeoIP tuples are coming in one after another. After ~4M tuples the GeoIP database is complete.

I also need to do the same query.

The way I do it right now is that I connect the two using ipStream.connect(geoIpStream).flatMap(CODE) where in CODE I either lookup the country (flatMap1) or I update the GeoIP state flatMap(2). For the state I use a ValueStateDescriptor of type GeoIPDatabase which contains all GeoIP information.

An alternative approach would be to have the Database in a file in the filesystem (in-memory preferably) and then load it in the enrich operator.
Reply | Threaded
Open this post in threaded view
|

Re: Calling external services/databases from DataStream API

Stephan Ewen
Hi!

The Distributed cache would actually indeed be nice to add to the DataStream API. Since the runtime parts for that are all in place, the code would be mainly on the "client" side that sets up the JobGraph to be submitted and executed.

For the problem of scaling this, there are two solutions that I can see:

(1) Simpler: Use the new asynchronous I/O operator to talk with the external database in an asynchronous fashion (that should help to get higher throughput) https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html

(2) More elaborate: Convert the lookup database into a "changelog stream" and make the enrichment operation a "stream-to-stream" join.

Greetings,
Stephan


On Mon, Jan 30, 2017 at 1:36 PM, Jonas <[hidden email]> wrote:
I have a similar usecase where I (for the purposes of this discussion) have a
GeoIP Database that is not fully available from the start but will
eventually be "full". The GeoIP tuples are coming in one after another.
After ~4M tuples the GeoIP database is complete.

I also need to do the same query.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Calling-external-services-databases-from-DataStream-API-tp11366p11367.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

RE: Calling external services/databases from DataStream API

Diego Fustes Villadóniga

Hi Stephan,

 

Thanks a lot for your response. I’ll study the options that you mention, I’m not sure if the “chagelog stream” will be easy to implement since the lookup is based on matching IP ranges and not just keys.

 

Regards,

 

Diego

 

De: Stephan Ewen [mailto:[hidden email]]
Enviado el: lunes, 30 de enero de 2017 17:39
Para: [hidden email]
Asunto: Re: Calling external services/databases from DataStream API

 

Hi!

 

The Distributed cache would actually indeed be nice to add to the DataStream API. Since the runtime parts for that are all in place, the code would be mainly on the "client" side that sets up the JobGraph to be submitted and executed.

 

For the problem of scaling this, there are two solutions that I can see:

 

(1) Simpler: Use the new asynchronous I/O operator to talk with the external database in an asynchronous fashion (that should help to get higher throughput) https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html

 

(2) More elaborate: Convert the lookup database into a "changelog stream" and make the enrichment operation a "stream-to-stream" join.

 

Greetings,

Stephan

 

 

On Mon, Jan 30, 2017 at 1:36 PM, Jonas <[hidden email]> wrote:

I have a similar usecase where I (for the purposes of this discussion) have a
GeoIP Database that is not fully available from the start but will
eventually be "full". The GeoIP tuples are coming in one after another.
After ~4M tuples the GeoIP database is complete.

I also need to do the same query.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Calling-external-services-databases-from-DataStream-API-tp11366p11367.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

 

Reply | Threaded
Open this post in threaded view
|

Re: Calling external services/databases from DataStream API

Fabian Hueske-2
Hi Diego,

you can also broadcast a changelog stream:

DataStream<X> mainStream = ...
DataStream<Y> changeStream = ...

mainStream.connect(changeStream.broadcast()).flatMap(new YourCoFlatMapFunction());

All records of the changeStream will be forwarded to each instance of the flatmap operator.

Best, Fabian

2017-01-31 8:12 GMT+01:00 Diego Fustes Villadóniga <[hidden email]>:

Hi Stephan,

 

Thanks a lot for your response. I’ll study the options that you mention, I’m not sure if the “chagelog stream” will be easy to implement since the lookup is based on matching IP ranges and not just keys.

 

Regards,

 

Diego

 

De: Stephan Ewen [mailto:[hidden email]]
Enviado el: lunes, 30 de enero de 2017 17:39
Para: [hidden email]
Asunto: Re: Calling external services/databases from DataStream API

 

Hi!

 

The Distributed cache would actually indeed be nice to add to the DataStream API. Since the runtime parts for that are all in place, the code would be mainly on the "client" side that sets up the JobGraph to be submitted and executed.

 

For the problem of scaling this, there are two solutions that I can see:

 

(1) Simpler: Use the new asynchronous I/O operator to talk with the external database in an asynchronous fashion (that should help to get higher throughput) https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html

 

(2) More elaborate: Convert the lookup database into a "changelog stream" and make the enrichment operation a "stream-to-stream" join.

 

Greetings,

Stephan

 

 

On Mon, Jan 30, 2017 at 1:36 PM, Jonas <[hidden email]> wrote:

I have a similar usecase where I (for the purposes of this discussion) have a
GeoIP Database that is not fully available from the start but will
eventually be "full". The GeoIP tuples are coming in one after another.
After ~4M tuples the GeoIP database is complete.

I also need to do the same query.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Calling-external-services-databases-from-DataStream-API-tp11366p11367.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.