import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.internals.TransactionalRequestResult; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; public class Util { static KafkaProducer kafkaProducer; public static void flushNewPartitions() { TransactionalRequestResult result = enqueueNewPartitions(); Object sender = getValue(kafkaProducer, "sender"); invoke(sender, "wakeup"); result.await(); } private static TransactionalRequestResult enqueueNewPartitions() { Object transactionManager = getValue(kafkaProducer, "transactionManager"); synchronized (transactionManager) { Object newPartitionsInTransaction = getValue(transactionManager, "newPartitionsInTransaction"); Object isEmpty = invoke(newPartitionsInTransaction, "isEmpty"); if (isEmpty instanceof Boolean && !((Boolean)isEmpty)) { Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); return result; } else { TransactionalRequestResult result = new TransactionalRequestResult("AddPartitionsToTxn"); result.done(); return result; } } } protected static Enum getEnum(String enumFullName) { String[] x = enumFullName.split("\\.(?=[^\\.]+$)"); if (x.length == 2) { String enumClassName = x[0]; String enumName = x[1]; try { Class cl = (Class) Class.forName(enumClassName); return Enum.valueOf(cl, enumName); } catch (ClassNotFoundException e) { throw new RuntimeException("Incompatible KafkaProducer version", e); } } return null; } protected static Object invoke(Object object, String methodName, Object... args) { Class[] argTypes = new Class[args.length]; for (int i = 0; i < args.length; i++) { argTypes[i] = args[i].getClass(); } return invoke(object, methodName, argTypes, args); } private static Object invoke(Object object, String methodName, Class[] argTypes, Object[] args) { try { Method method = object.getClass().getDeclaredMethod(methodName, argTypes); method.setAccessible(true); return method.invoke(object, args); } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { throw new RuntimeException("Incompatible KafkaProducer version", e); } } protected static Object getValue(Object object, String fieldName) { return getValue(object, object.getClass(), fieldName); } private static Object getValue(Object object, Class clazz, String fieldName) { try { Field field = clazz.getDeclaredField(fieldName); field.setAccessible(true); return field.get(object); } catch (NoSuchFieldException | IllegalAccessException e) { throw new RuntimeException("Incompatible KafkaProducer version", e); } } protected static void setValue(Object object, String fieldName, Object value) { try { Field field = object.getClass().getDeclaredField(fieldName); field.setAccessible(true); field.set(object, value); } catch (NoSuchFieldException | IllegalAccessException e) { throw new RuntimeException("Incompatible KafkaProducer version", e); } } }