> We have a system where the Kafka partition a message should go into is a
> function of a value in the message. Often, it’s value % # partitions, but
> for some values it’s not - it’s a specified list of partitions that changes
> over time. Our “simple Java library” that produces messages for this system
> also has a background thread that periodically polls a HTTP endpoint (at a
> rate of 1/minute as its default) to refresh that list of special cases.
>
> It’s easy to create a FlinkKafkaPartitioner that does the mod operation;
> what I’m not so sure about is how to get this polling operation into the
> partitioner. I’m about to try it the obvious way (create a background
> thread that polls the URL and updates the partition map), but I wonder if
> that’s actually going to cause a bunch of problems for the Flink runtime.
>
> Here’s the code that I have right now:
> public class EventInsertPartitioner extends KafkaPartitioner<Tuple2<Long,
> String>> { private final String partitionerURL;
> private final long updateIntervalInMillis;
> private Map<Long, List<Integer>> partitionMap;
> private ScheduledExecutorService executor;
>
> public EventInsertPartitioner(String partitionerURL, long
> updateIntervalInMillis) { this.partitionerURL = partitionerURL;
> this.updateIntervalInMillis = updateIntervalInMillis;
> this.partitionMap = new HashMap<>();
> }
>
> @Override
> public void open(int parallelInstanceId, int parallelInstances, int[]
> partitions) { executor = Executors.newScheduledThreadPool(1);
> executor.scheduleAtFixedRate(
> () -> updatePartitionMapRunnable(),
> updateIntervalInMillis,
> updateIntervalInMillis,
> TimeUnit.MILLISECONDS);
>
> }
>
> private void updatePartitionMapRunnable() {
> // Make synchronous request to partitionerURL
> // This is a simple JSON that matches our data
> String response = "{1:[1,2,3],2:[2]}";
> // Replace current partitionMap with new HashMap from the response
> this.partitionMap = convertResponseToMap(response);
> // Replacing the current value of partitionMap with the updated
> version doesn't // require synchronization
> }
>
> private Map<Long, List<Integer>> convertResponseToMap(String response) {
> Map<Long, List<Integer>> hashMap = new HashMap<>();
> // Convert response to JSON structure and just use that?
> // or Iterate and add to local hashMap
> return hashMap;
> }
>
> @Override
> public int partition(Tuple2<Long, String> next, byte[] serializedKey,
> byte[] serializedValue, int numPartitions) { long myKey = next.f0;
>
> if (partitionMap.containsKey(myKey)) {
> List<Integer> partitions = partitionMap.get(myKey);
> myKey =
> partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); }
>
> return (int)(myKey % numPartitions);
> }
> }
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
>
[hidden email]
> M: +1 630 363 8835