After reading about FlinkKafkaProducer011 and 2PC function in FLINK, I know,
when snapshotState(),
- preCommit currentTransaction.
- add <currentTransaction, newTransaction> to the State.
when Checkpoint done and notifyCheckpointComplete(),
- producer will commit currentTransaction to brokers.
when initializeState(),
- restore from State.
- commit currentTransaction and abort newTransaction.
And I have one question, what happens if program fails after notifyCheckpointComplete() done?
As my opinion, when it recovers, it will re-commit what has committed which results duplicate.