Kafka transaction error lead to data loss under end to end exact-once

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka transaction error lead to data loss under end to end exact-once

Lu Niu
Hi,

We are using end to end exact-once flink + kafka and encountered belowing exception which usually came after checkpoint failures:
```
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
2020-07-28 16:27:51,633 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxx (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214)
... 5 more

```
We did some end to end tests and noticed whenever such a thing happens, there will be a data loss.

Referring to several related questions, I understand I need to increase `transaction.timeout.ms`  because:
```
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the time between Flink application crash and completed restart is larger than Kafka’s transaction timeout there will be data loss (Kafka will automatically abort transactions that exceeded timeout time).
```

But I want to confirm with the community that:
Does an exception like this will always lead to data loss? 

I asked because we get this exception sometimes even when the checkpoint succeeds. 

Setup:
Flink 1.9.1

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Kafka transaction error lead to data loss under end to end exact-once

r_khachatryan
Hi Lu,

Yes, this error indicates data loss (unless there were no records in the transactions).

Regards,
Roman


On Mon, Aug 3, 2020 at 9:14 PM Lu Niu <[hidden email]> wrote:
Hi,

We are using end to end exact-once flink + kafka and encountered belowing exception which usually came after checkpoint failures:
```
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
2020-07-28 16:27:51,633 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxx (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214)
... 5 more

```
We did some end to end tests and noticed whenever such a thing happens, there will be a data loss.

Referring to several related questions, I understand I need to increase `transaction.timeout.ms`  because:
```
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the time between Flink application crash and completed restart is larger than Kafka’s transaction timeout there will be data loss (Kafka will automatically abort transactions that exceeded timeout time).
```

But I want to confirm with the community that:
Does an exception like this will always lead to data loss? 

I asked because we get this exception sometimes even when the checkpoint succeeds. 

Setup:
Flink 1.9.1

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Kafka transaction error lead to data loss under end to end exact-once

Lu Niu
Hi, Khachatryan

Thank you for the reply. Is that a problem that can be fixed? If so, is the fix on roadmap? Thanks!

Best
Lu

On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman <[hidden email]> wrote:
Hi Lu,

Yes, this error indicates data loss (unless there were no records in the transactions).

Regards,
Roman


On Mon, Aug 3, 2020 at 9:14 PM Lu Niu <[hidden email]> wrote:
Hi,

We are using end to end exact-once flink + kafka and encountered belowing exception which usually came after checkpoint failures:
```
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
2020-07-28 16:27:51,633 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxx (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214)
... 5 more

```
We did some end to end tests and noticed whenever such a thing happens, there will be a data loss.

Referring to several related questions, I understand I need to increase `transaction.timeout.ms`  because:
```
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the time between Flink application crash and completed restart is larger than Kafka’s transaction timeout there will be data loss (Kafka will automatically abort transactions that exceeded timeout time).
```

But I want to confirm with the community that:
Does an exception like this will always lead to data loss? 

I asked because we get this exception sometimes even when the checkpoint succeeds. 

Setup:
Flink 1.9.1

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Kafka transaction error lead to data loss under end to end exact-once

r_khachatryan
Hi Lu,

AFAIK, it's not going to be fixed. As you mentioned in the first email, Kafka should be configured so that it's transaction timeout is less than your max checkpoint duration. 

However, you should not only change transaction.timeout.ms in producer but also transaction.max.timeout.ms on your brokers.

Regards,
Roman


On Wed, Aug 5, 2020 at 12:24 AM Lu Niu <[hidden email]> wrote:
Hi, Khachatryan

Thank you for the reply. Is that a problem that can be fixed? If so, is the fix on roadmap? Thanks!

Best
Lu

On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman <[hidden email]> wrote:
Hi Lu,

Yes, this error indicates data loss (unless there were no records in the transactions).

Regards,
Roman


On Mon, Aug 3, 2020 at 9:14 PM Lu Niu <[hidden email]> wrote:
Hi,

We are using end to end exact-once flink + kafka and encountered belowing exception which usually came after checkpoint failures:
```
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
2020-07-28 16:27:51,633 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxx (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214)
... 5 more

```
We did some end to end tests and noticed whenever such a thing happens, there will be a data loss.

Referring to several related questions, I understand I need to increase `transaction.timeout.ms`  because:
```
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the time between Flink application crash and completed restart is larger than Kafka’s transaction timeout there will be data loss (Kafka will automatically abort transactions that exceeded timeout time).
```

But I want to confirm with the community that:
Does an exception like this will always lead to data loss? 

I asked because we get this exception sometimes even when the checkpoint succeeds. 

Setup:
Flink 1.9.1

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Kafka transaction error lead to data loss under end to end exact-once

Piotr Nowojski-4
Hi Lu,

In this case, as it looks from the quite fragmented log/error message that you posted, the job has failed so Flink indeed detected some issue and that probably means a data loss in Kafka (in such case you could probably recover some lost records by reading with `read_uncommitted` mode from Kafka, but that can leads to data duplication).

However a very similar error can be logged by Flink as WARN during recovery. In that case it can mean either:
- data loss because of timeouts (keep in mind that kafka transactional timeouts must cover: checkpoint interval + downtime during the failure + time to restart and recover Flink job)
- transaction was already committed before, just before failure has happened
 
and there is unfortunately no way using Kafka API to distinguish those two cases.

Piotrek


śr., 5 sie 2020 o 10:17 Khachatryan Roman <[hidden email]> napisał(a):
Hi Lu,

AFAIK, it's not going to be fixed. As you mentioned in the first email, Kafka should be configured so that it's transaction timeout is less than your max checkpoint duration. 

However, you should not only change transaction.timeout.ms in producer but also transaction.max.timeout.ms on your brokers.

Regards,
Roman


On Wed, Aug 5, 2020 at 12:24 AM Lu Niu <[hidden email]> wrote:
Hi, Khachatryan

Thank you for the reply. Is that a problem that can be fixed? If so, is the fix on roadmap? Thanks!

Best
Lu

On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman <[hidden email]> wrote:
Hi Lu,

Yes, this error indicates data loss (unless there were no records in the transactions).

Regards,
Roman


On Mon, Aug 3, 2020 at 9:14 PM Lu Niu <[hidden email]> wrote:
Hi,

We are using end to end exact-once flink + kafka and encountered belowing exception which usually came after checkpoint failures:
```
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
2020-07-28 16:27:51,633 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxx (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214)
... 5 more

```
We did some end to end tests and noticed whenever such a thing happens, there will be a data loss.

Referring to several related questions, I understand I need to increase `transaction.timeout.ms`  because:
```
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the time between Flink application crash and completed restart is larger than Kafka’s transaction timeout there will be data loss (Kafka will automatically abort transactions that exceeded timeout time).
```

But I want to confirm with the community that:
Does an exception like this will always lead to data loss? 

I asked because we get this exception sometimes even when the checkpoint succeeds. 

Setup:
Flink 1.9.1

Best
Lu