After our analysis, we found a bug in the org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction method
The analysis process is as follows:
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(stateDescriptor);
boolean recoveredUserContext = false;
if (context.isRestored()) {
LOG.info("{} - restoring state", name());
for (State<TXN, CONTEXT> operatorState : state.get()) {
userContext = operatorState.getContext();
List<TransactionHolder<TXN>> recoveredTransactions =
operatorState.getPendingCommitTransactions();
List<TXN> handledTransactions = new ArrayList<>(recoveredTransactions.size() + 1);
for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
// If this fails to succeed eventually, there is actually data loss
recoverAndCommitInternal(recoveredTransaction);
handledTransactions.add(recoveredTransaction.handle);
LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
}
{
TXN transaction = operatorState.getPendingTransaction().handle;
recoverAndAbort(transaction);
handledTransactions.add(transaction);
LOG.info(
"{} aborted recovered transaction {}",
name(),
operatorState.getPendingTransaction());
}
if (userContext.isPresent()) {
finishRecoveringContext(handledTransactions);
recoveredUserContext = true;
}
}
}
(1)recoverAndCommitInternal(recoveredTransaction);
The previous transactionalid, producerId and epoch in the state are used to commit the transaction,However, we find that the producerIdAndEpoch of transactionManager is a static constant (ProducerIdAndEpoch.NONE), which pollutes the static constant ProducerIdAndEpoch.NONE
@Override
protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
try {
producer = initTransactionalProducer(transaction.transactionalId, false);
producer.resumeTransaction(transaction.producerId, transaction.epoch);
producer.commitTransaction();
} catch (InvalidTxnStateException | ProducerFencedException ex) {
// That means we have committed this transaction before.
LOG.warn(
"Encountered error {} while recovering transaction {}. "
+ "Presumably this transaction has been already committed before",
ex,
transaction);
} finally {
if (producer != null) {
producer.close(0, TimeUnit.SECONDS);
}
}
}
}
public void resumeTransaction(long producerId, short epoch) {
synchronized (producerClosingLock) {
ensureNotClosed();
Preconditions.checkState(
producerId >= 0 && epoch >= 0,
"Incorrect values for producerId %s and epoch %s",
producerId,
epoch);
LOG.info(
"Attempting to resume transaction {} with producerId {} and epoch {}",
transactionalId,
producerId,
epoch);
Object transactionManager = getField(kafkaProducer, "transactionManager");
synchronized (transactionManager) {
Object topicPartitionBookkeeper =
getField(transactionManager, "topicPartitionBookkeeper");
invoke(
transactionManager,
"transitionTo",
getEnum(
"org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
invoke(topicPartitionBookkeeper, "reset");
Object producerIdAndEpoch = getField(transactionManager, "producerIdAndEpoch");
setField(producerIdAndEpoch, "producerId", producerId);
setField(producerIdAndEpoch, "epoch", epoch);
invoke(
transactionManager,
"transitionTo",
getEnum(
"org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
invoke(
transactionManager,
"transitionTo",
getEnum(
"org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
setField(transactionManager, "transactionStarted", true);
}
}
}
public TransactionManager(LogContext logContext,
String transactionalId,
int transactionTimeoutMs,
long retryBackoffMs,
ApiVersions apiVersions) {
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
this.transactionalId = transactionalId;
this.log = logContext.logger(TransactionManager.class);
this.transactionTimeoutMs = transactionTimeoutMs;
this.transactionCoordinator = null;
this.consumerGroupCoordinator = null;
this.newPartitionsInTransaction = new HashSet<>();
this.pendingPartitionsInTransaction = new HashSet<>();
this.partitionsInTransaction = new HashSet<>();
this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority));
this.pendingTxnOffsetCommits = new HashMap<>();
this.partitionsWithUnresolvedSequences = new HashMap<>();
this.partitionsToRewriteSequences = new HashSet<>();
this.retryBackoffMs = retryBackoffMs;
this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
this.apiVersions = apiVersions;
}
public class ProducerIdAndEpoch {
public static final ProducerIdAndEpoch NONE = new ProducerIdAndEpoch(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH);
public final long producerId;
public final short epoch;
public ProducerIdAndEpoch(long producerId, short epoch) {
this.producerId = producerId;
this.epoch = epoch;
}
public boolean isValid() {
return RecordBatch.NO_PRODUCER_ID < producerId;
}
@Override
public String toString() {
return "(producerId=" + producerId + ", epoch=" + epoch + ")";
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ProducerIdAndEpoch that = (ProducerIdAndEpoch) o;
if (producerId != that.producerId) return false;
return epoch == that.epoch;
}
@Override
public int hashCode() {
int result = (int) (producerId ^ (producerId >>> 32));
result = 31 * result + (int) epoch;
return result;
}
}
(2)In the second step, recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction), when initializing the transaction, producerId and epoch in the first step pollute ProducerIdAndEpoch.NONE. Therefore, when an initialization request is sent to Kafka, the values of the producerId and epoch variables in the request parameter ProducerIdAndEpoch.NONE are equal to the values of the producerId and epoch variables in the first transaction commit, not equal to - 1, - 1. So Kafka throws an exception:
Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at java.lang.Thread.run(Thread.java:748)
protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
try {
producer = initTransactionalProducer(transaction.transactionalId, false);
producer.initTransactions();
} finally {
if (producer != null) {
producer.close(0, TimeUnit.SECONDS);
}
}
}
}
public synchronized TransactionalRequestResult initializeTransactions() {
return initializeTransactions(ProducerIdAndEpoch.NONE);
}
synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
return handleCachedTransactionRequestResult(() -> {
// If this is an epoch bump, we will transition the state as part of handling the EndTxnRequest
if (!isEpochBump) {
transitionTo(State.INITIALIZING);
log.info("Invoking InitProducerId for the first time in order to acquire a producer ID");
} else {
log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch", producerIdAndEpoch);
}
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(transactionTimeoutMs)
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch);
InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
isEpochBump);
enqueueRequest(handler);
return handler.result;
}, State.INITIALIZING);
}