Hi team, I just have a bit of confusion where Two Phase Commit and Kafka's transaction aware producer using transaction.id and enable.autocommit plays together
what I understand of Flink checkpoint (correct me if I'm wrong) is that it saves the transaction ID as well as the consumer's commit offsets, so when application fails and restarts, it will reprocess everything from the last checkpoint and data will be idempotently processed in the Kafka side. (exactly-once processing rather than exactly-once delivery) the question is where does 2 phase commit play a role here? |
Hi Kevin, Perhaps the easiest way to answer your question, is to go through how the exactly-once FlinkKafkaProducer using a 2PC implementation on top of Flink's checkpointing mechanism. The phases can be broken down as follows (simplified assuming max 1 concurrent checkpoint and that checkpoint completion notifications are never late):
There are some edge cases that is handled, e.g. a checkpoint is considered complete, but before all sinks receive the completion notification and commit their transactions, the job fails (that's why txn ids are written into the checkpoint as well, to make sure all txns belonging to that checkpoint is still eventually committed after restore). The general takeaway is that each parallel sink operator can commit the Kafka transactions only after all participants in the 2PC (i.e. all Flink operators and sinks) acknowledge that they are ready to commit. In Flink terms, the JM is the coordinator, and an operator / sink completing their checkpoint is acknowledging that they are ready for committing. From an end-to-end point of view, downstream consumers of the output Kafka topic will not see records (assuming they are consuming in Kafka's read.commited mode) until the upstream Flink application sink commits the open Kafka transactions. This boils down to, the read latency for downstream applications is at least the upstream Flink app's checkpoint interval. Hope this helps! Cheers, Gordon On Wed, Mar 10, 2021 at 5:20 PM Kevin Kwon <[hidden email]> wrote:
|
+ [hidden email] (adding the conversation back to the user mailing list) On Fri, Mar 12, 2021 at 6:06 AM Kevin Kwon <[hidden email]> wrote:
Flink does commit the offsets back to Kafka when sources perform checkpoints, but those offsets are not used for fault-tolerance and restore by Flink. They are purely used as a means for exposing consumption progress. Flink only respects the offsets being written to its checkpoints. Those offsets are essentially the state of the FlinkKafkaConsumer sources, and are written to checkpoints by the sources. As previously explained, the last COMMIT stage comes after that, i.e. after all Flink operators complete their state checkpoint.
No. And indeed, while Flink guarantees that checkpoint complete notifications will be eventually received by all listening operators (e.g. the Kafka sinks), the job can ideed fail when only partially some sinks have received the notification (and commits). The way Flink handles the issue you mentioned, is that all pending-commit transaction ids will be part of the sink's state. When a sink checkpoints its state (during the pre-commit phase), it writes all pending-commit transaction ids. If for any reason the job fails and failover is triggered, the restored lastest complete checkpoint will contain those pending-commit transaction ids. Then, those pending transactions will be attempted to be committed. So, in the end, you can see this as the transactions will all eventually be successfully committed, even in the event of a failure.
|
Free forum by Nabble | Edit this page |