Making external calls from a FlinkKafkaPartitioner

Posted by Ron Crocker on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Making-external-calls-from-a-FlinkKafkaPartitioner-tp16530.html

We have a system where the Kafka partition a message should go into is a function of a value in the message. Often, it’s value % # partitions, but for some values it’s not - it’s a specified list of partitions that changes over time. Our “simple Java library” that produces messages for this system also has a background thread that periodically polls a HTTP endpoint (at a rate of 1/minute as its default) to refresh that list of special cases.

It’s easy to create a FlinkKafkaPartitioner that does the mod operation; what I’m not so sure about is how to get this polling operation into the partitioner. I’m about to try it the obvious way (create a background thread that polls the URL and updates the partition map), but I wonder if that’s actually going to cause a bunch of problems for the Flink runtime.

Here’s the code that I have right now:
public class EventInsertPartitioner extends KafkaPartitioner<Tuple2<Long, String>> {
private final String partitionerURL;
private final long updateIntervalInMillis;
private Map<Long, List<Integer>> partitionMap;
private ScheduledExecutorService executor;

public EventInsertPartitioner(String partitionerURL, long updateIntervalInMillis) {
this.partitionerURL = partitionerURL;
this.updateIntervalInMillis = updateIntervalInMillis;
this.partitionMap = new HashMap<>();
}

@Override
public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(
() -> updatePartitionMapRunnable(),
updateIntervalInMillis,
updateIntervalInMillis,
TimeUnit.MILLISECONDS);

}

private void updatePartitionMapRunnable() {
// Make synchronous request to partitionerURL
// This is a simple JSON that matches our data
String response = "{1:[1,2,3],2:[2]}";
// Replace current partitionMap with new HashMap from the response
this.partitionMap = convertResponseToMap(response);
// Replacing the current value of partitionMap with the updated version doesn't
// require synchronization
}

private Map<Long, List<Integer>> convertResponseToMap(String response) {
Map<Long, List<Integer>> hashMap = new HashMap<>();
// Convert response to JSON structure and just use that?
// or Iterate and add to local hashMap
return hashMap;
}

@Override
public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
long myKey = next.f0;

if (partitionMap.containsKey(myKey)) {
List<Integer> partitions = partitionMap.get(myKey);
myKey = partitions.get(ThreadLocalRandom.current().nextInt(partitions.size()));
}

return (int)(myKey % numPartitions);
}
}
Ron
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: +1 630 363 8835