Committing Kafka Transactions during Savepoint

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Committing Kafka Transactions during Savepoint

Scott Kidder
I recently began using the exactly-once processing semantic with the Kafka 0.11 producer in Flink 1.4.2. It's been working great!

Are Kafka transactions committed when creating a Flink savepoint? How does this affect the recovery behavior in Flink if, before the completion of the next checkpoint, the application is restarted and restores from a checkpoint taken before the savepoint? It seems like this might lead to the Kafka producer writing a message multiple times with different committed Kafka transactions.

--
Scott Kidder
Reply | Threaded
Open this post in threaded view
|

Re: Committing Kafka Transactions during Savepoint

Aljoscha Krettek
Hi,

this has been in the back of my head for a while now. I finally created a Jira issue: https://issues.apache.org/jira/browse/FLINK-9983

In there, I also outline a better fix that will take a bit longer to implement.

Best,
Aljoscha

On 26. Jul 2018, at 23:04, Scott Kidder <[hidden email]> wrote:

I recently began using the exactly-once processing semantic with the Kafka 0.11 producer in Flink 1.4.2. It's been working great!

Are Kafka transactions committed when creating a Flink savepoint? How does this affect the recovery behavior in Flink if, before the completion of the next checkpoint, the application is restarted and restores from a checkpoint taken before the savepoint? It seems like this might lead to the Kafka producer writing a message multiple times with different committed Kafka transactions.

--
Scott Kidder

Reply | Threaded
Open this post in threaded view
|

Re: Committing Kafka Transactions during Savepoint

Scott Kidder
Thank you, Aljoscha! Are Kafka transactions committed when a running job has been instructed to cancel with a savepoint (e.g. `flink cancel -s xxxx`)? This is my primary use for savepoints. I would expect that when a new job is submitted with the savepoint, as in the case of an application upgrade, Flink withl create a new Kafka transaction and processing will be exactly-once.

--Scott Kidder

On Fri, Jul 27, 2018 at 5:09 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,

this has been in the back of my head for a while now. I finally created a Jira issue: https://issues.apache.org/jira/browse/FLINK-9983

In there, I also outline a better fix that will take a bit longer to implement.

Best,
Aljoscha

On 26. Jul 2018, at 23:04, Scott Kidder <[hidden email]> wrote:

I recently began using the exactly-once processing semantic with the Kafka 0.11 producer in Flink 1.4.2. It's been working great!

Are Kafka transactions committed when creating a Flink savepoint? How does this affect the recovery behavior in Flink if, before the completion of the next checkpoint, the application is restarted and restores from a checkpoint taken before the savepoint? It seems like this might lead to the Kafka producer writing a message multiple times with different committed Kafka transactions.

--
Scott Kidder

Reply | Threaded
Open this post in threaded view
|

Re: Committing Kafka Transactions during Savepoint

vino yang
Hi Scott,

For EXACTLY_ONCE in sink end with Kafka 0.11+ producer, The answer is YES.
There is a official documentation you can have a good knowledge of this topic[1].


Thanks, vino.



2018-07-27 22:53 GMT+08:00 Scott Kidder <[hidden email]>:
Thank you, Aljoscha! Are Kafka transactions committed when a running job has been instructed to cancel with a savepoint (e.g. `flink cancel -s xxxx`)? This is my primary use for savepoints. I would expect that when a new job is submitted with the savepoint, as in the case of an application upgrade, Flink withl create a new Kafka transaction and processing will be exactly-once.

--Scott Kidder

On Fri, Jul 27, 2018 at 5:09 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,

this has been in the back of my head for a while now. I finally created a Jira issue: https://issues.apache.org/jira/browse/FLINK-9983

In there, I also outline a better fix that will take a bit longer to implement.

Best,
Aljoscha

On 26. Jul 2018, at 23:04, Scott Kidder <[hidden email]> wrote:

I recently began using the exactly-once processing semantic with the Kafka 0.11 producer in Flink 1.4.2. It's been working great!

Are Kafka transactions committed when creating a Flink savepoint? How does this affect the recovery behavior in Flink if, before the completion of the next checkpoint, the application is restarted and restores from a checkpoint taken before the savepoint? It seems like this might lead to the Kafka producer writing a message multiple times with different committed Kafka transactions.

--
Scott Kidder


Reply | Threaded
Open this post in threaded view
|

Re: Committing Kafka Transactions during Savepoint

Aljoscha Krettek
Hi Scott,

Some more clarifications:

Doing a stop-with-savepoint will suspend the checkpoint coordinator, meaning that no new checkpoints will happen between taking the savepoint and shutting down the job. This means you will be save from duplicates if you only use savepoints for this.

Regarding committing of the transactions: they might be committed but they probably won't be because there is no mechanism that ensures side effects of completed checkpoints are effected before shutting down the job after taking the savepoint. The transactional sinks work like this: 1) do checkpoint, where we prepare the transaction, notify checkpoint coordinator that our checkpoint is "complete" 2) wait for message from checkpoint coordinator that all checkpoints (from all parallel operators) are complete 3) commit the transaction. That last step is currently not guaranteed to happen when stopping with a savepoint. However, when restarting a job from a savepoint the source will check if there are any open transactions that should have been committed (it knows that because they are stored in state) and then commits them.

This works but is a but fragile so it's high on my list of things I want to see fixed in Flink 1.7.

Best,
Aljoscha

On 30. Jul 2018, at 17:34, vino yang <[hidden email]> wrote:

Hi Scott,

For EXACTLY_ONCE in sink end with Kafka 0.11+ producer, The answer is YES.
There is a official documentation you can have a good knowledge of this topic[1].


Thanks, vino.



2018-07-27 22:53 GMT+08:00 Scott Kidder <[hidden email]>:
Thank you, Aljoscha! Are Kafka transactions committed when a running job has been instructed to cancel with a savepoint (e.g. `flink cancel -s xxxx`)? This is my primary use for savepoints. I would expect that when a new job is submitted with the savepoint, as in the case of an application upgrade, Flink withl create a new Kafka transaction and processing will be exactly-once.

--Scott Kidder

On Fri, Jul 27, 2018 at 5:09 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,

this has been in the back of my head for a while now. I finally created a Jira issue: https://issues.apache.org/jira/browse/FLINK-9983

In there, I also outline a better fix that will take a bit longer to implement.

Best,
Aljoscha

On 26. Jul 2018, at 23:04, Scott Kidder <[hidden email]> wrote:

I recently began using the exactly-once processing semantic with the Kafka 0.11 producer in Flink 1.4.2. It's been working great!

Are Kafka transactions committed when creating a Flink savepoint? How does this affect the recovery behavior in Flink if, before the completion of the next checkpoint, the application is restarted and restores from a checkpoint taken before the savepoint? It seems like this might lead to the Kafka producer writing a message multiple times with different committed Kafka transactions.

--
Scott Kidder