Calling external services/databases from DataStream API

Posted by Diego Fustes Villadóniga on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Calling-external-services-databases-from-DataStream-API-tp11366.html

Hi all,

 

I’m working on an application that enriches network connections with Geolocations, using the GeoIP database, which is stored as a file in HDFS. To do so, I map every connection in my stream, using this function:

 

def enrichIp(ip: String): Location = {

   
val location = service.getLocation(ip)


   Location(location.
countryName, Some(location.city), Some(latlon))
}

 

The variable service is a variable of type LookupService, declared this way:

 

private lazy val service = new LookupService(Paths.get(path).toFile, LookupService.GEOIP_MEMORY_CACHE)
 

I can see several problems in this architecture. First, I need to copy manually the file from HDFS to the local filesytem prior to start the streaming. This would be solved with something like a DistributedCache, but it is not available in the DataStream API, or at least I can’t see it.

 

Furthermore, I need to load the full GeoIp database into memory for every Flink task. This might be OK in terms of performance, but the memory consumption is quite high. The other alternative that I see is to load the GeoIP file into an external database (Redis, Postgres, etc) and query it in real time, which might be also a s good solution… Is there any built-in mechanism to do this?

 

 

Kind regards,

 

Diego