Relation between Two Phase Commit and Kafka's transaction aware producer

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Relation between Two Phase Commit and Kafka's transaction aware producer

Kevin Kwon
Hi team, I just have a bit of confusion where Two Phase Commit and Kafka's transaction aware producer using transaction.id and enable.autocommit plays together

what I understand of Flink checkpoint (correct me if I'm wrong) is that it saves the transaction ID as well as the consumer's commit offsets, so when application fails and restarts, it will reprocess everything from the last checkpoint and data will be idempotently processed in the Kafka side. (exactly-once processing rather than exactly-once delivery)

the question is where does 2 phase commit play a role here?
Reply | Threaded
Open this post in threaded view
|

Re: Relation between Two Phase Commit and Kafka's transaction aware producer

Tzu-Li (Gordon) Tai
Hi Kevin,

Perhaps the easiest way to answer your question, is to go through how the exactly-once FlinkKafkaProducer using a 2PC implementation on top of Flink's checkpointing mechanism.

The phases can be broken down as follows (simplified assuming max 1 concurrent checkpoint and that checkpoint completion notifications are never late):
  1. BEGIN_TXN: In between each Flink checkpoint, each FlinkKafkaProducer sink operator creates a new Kafka transaction. You can assume that on startup, a new Kafka transaction is created immediately for records that occur before the first checkpoint.
  2. PRE_COMMIT: Once a FlinkKafkaProducer sink operator receives Flink's checkpoint barrier, it flushes pending records to the current open transaction, and opens a new one for future records, which belongs to the next checkpoint and thus should be written to the next transaction. Once flushed, the sink operator acknowledges it has completed its checkpoint.
  3. COMMIT: Once all sinks acknowledge checkpoint completion, the Flink checkpoint is considered complete (containing state of all operators + consumer offsets). Once that happens, Flink notifies each sink operator of the completion, and only upon receiving this notification, can the sink operator commit the previous transaction.
There are some edge cases that is handled, e.g. a checkpoint is considered complete, but before all sinks receive the completion notification and commit their transactions, the job fails (that's why txn ids are written into the checkpoint as well, to make sure all txns belonging to that checkpoint is still eventually committed after restore).

The general takeaway is that each parallel sink operator can commit the Kafka transactions only after all participants in the 2PC (i.e. all Flink operators and sinks) acknowledge that they are ready to commit.
In Flink terms, the JM is the coordinator, and an operator / sink completing their checkpoint is acknowledging that they are ready for committing.

From an end-to-end point of view, downstream consumers of the output Kafka topic will not see records (assuming they are consuming in Kafka's read.commited mode) until the upstream Flink application sink commits the open Kafka transactions.
This boils down to, the read latency for downstream applications is at least the upstream Flink app's checkpoint interval.

Hope this helps!

Cheers,
Gordon

On Wed, Mar 10, 2021 at 5:20 PM Kevin Kwon <[hidden email]> wrote:
Hi team, I just have a bit of confusion where Two Phase Commit and Kafka's transaction aware producer using transaction.id and enable.autocommit plays together

what I understand of Flink checkpoint (correct me if I'm wrong) is that it saves the transaction ID as well as the consumer's commit offsets, so when application fails and restarts, it will reprocess everything from the last checkpoint and data will be idempotently processed in the Kafka side. (exactly-once processing rather than exactly-once delivery)

the question is where does 2 phase commit play a role here?
Reply | Threaded
Open this post in threaded view
|

Re: Relation between Two Phase Commit and Kafka's transaction aware producer

Tzu-Li (Gordon) Tai
+ [hidden email]  (adding the conversation back to the user mailing list)

On Fri, Mar 12, 2021 at 6:06 AM Kevin Kwon <[hidden email]> wrote:
Thanks Tzu-Li

Interesting algorithm. Is consumer offset also committed to Kafka at the last COMMIT stage after the checkpoint has completed?

Flink does commit the offsets back to Kafka when sources perform checkpoints, but those offsets are not used for fault-tolerance and restore by Flink. They are purely used as a means for exposing consumption progress.
Flink only respects the offsets being written to its checkpoints. Those offsets are essentially the state of the FlinkKafkaConsumer sources, and are written to checkpoints by the sources.
As previously explained, the last COMMIT stage comes after that, i.e. after all Flink operators complete their state checkpoint.
 

Also does the coordinator (JM) write any data in write-ahead-log before sending out commit messages to all Flink entities? I'm concerned when JM succeeds sending a commit message to some entities but fails to others and dies.

No. And indeed, while Flink guarantees that checkpoint complete notifications will be eventually received by all listening operators (e.g. the Kafka sinks), the job can ideed fail when only partially some sinks have received the notification (and commits).
The way Flink handles the issue you mentioned, is that all pending-commit transaction ids will be part of the sink's state.
When a sink checkpoints its state (during the pre-commit phase), it writes all pending-commit transaction ids. If for any reason the job fails and failover is triggered, the restored lastest complete checkpoint will contain those pending-commit transaction ids.
Then, those pending transactions will be attempted to be committed.
So, in the end, you can see this as the transactions will all eventually be successfully committed, even in the event of a failure.
 

Finally, seems 2PC is implemented in order to make 3 entities, Kafka producer data / Kafka consumer offset / Flink Checkpoint to be in consistent state. However, since checkpoint is an ever increasing state like ledger that prunes the previous state as it goes, isn't write-ahead-log in the sink side enough to handle the exactly-once processing guarantee? what I mean is checking the state between WHL and the current checkpoint status and conservatively rollback to previous checkpoint and replay all data

On Thu, Mar 11, 2021 at 7:44 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Kevin,

Perhaps the easiest way to answer your question, is to go through how the exactly-once FlinkKafkaProducer using a 2PC implementation on top of Flink's checkpointing mechanism.

The phases can be broken down as follows (simplified assuming max 1 concurrent checkpoint and that checkpoint completion notifications are never late):
  1. BEGIN_TXN: In between each Flink checkpoint, each FlinkKafkaProducer sink operator creates a new Kafka transaction. You can assume that on startup, a new Kafka transaction is created immediately for records that occur before the first checkpoint.
  2. PRE_COMMIT: Once a FlinkKafkaProducer sink operator receives Flink's checkpoint barrier, it flushes pending records to the current open transaction, and opens a new one for future records, which belongs to the next checkpoint and thus should be written to the next transaction. Once flushed, the sink operator acknowledges it has completed its checkpoint.
  3. COMMIT: Once all sinks acknowledge checkpoint completion, the Flink checkpoint is considered complete (containing state of all operators + consumer offsets). Once that happens, Flink notifies each sink operator of the completion, and only upon receiving this notification, can the sink operator commit the previous transaction.
There are some edge cases that is handled, e.g. a checkpoint is considered complete, but before all sinks receive the completion notification and commit their transactions, the job fails (that's why txn ids are written into the checkpoint as well, to make sure all txns belonging to that checkpoint is still eventually committed after restore).

The general takeaway is that each parallel sink operator can commit the Kafka transactions only after all participants in the 2PC (i.e. all Flink operators and sinks) acknowledge that they are ready to commit.
In Flink terms, the JM is the coordinator, and an operator / sink completing their checkpoint is acknowledging that they are ready for committing.

From an end-to-end point of view, downstream consumers of the output Kafka topic will not see records (assuming they are consuming in Kafka's read.commited mode) until the upstream Flink application sink commits the open Kafka transactions.
This boils down to, the read latency for downstream applications is at least the upstream Flink app's checkpoint interval.

Hope this helps!

Cheers,
Gordon

On Wed, Mar 10, 2021 at 5:20 PM Kevin Kwon <[hidden email]> wrote:
Hi team, I just have a bit of confusion where Two Phase Commit and Kafka's transaction aware producer using transaction.id and enable.autocommit plays together

what I understand of Flink checkpoint (correct me if I'm wrong) is that it saves the transaction ID as well as the consumer's commit offsets, so when application fails and restarts, it will reprocess everything from the last checkpoint and data will be idempotently processed in the Kafka side. (exactly-once processing rather than exactly-once delivery)

the question is where does 2 phase commit play a role here?