Kafka ProducerFencedException after checkpointing

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka ProducerFencedException after checkpointing

Dongwon Kim-2

Hi,

I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 7th, ... checkpoints:
--
java.lang.RuntimeException: Error while confirming checkpoint
	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
--

FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing using Kafka sink.
We use FsStateBackend to store snapshot data on HDFS.

As shown in configuration.png, my checkpoint configuration is:
- Checkpointing Mode : Exactly Once
- Interval : 15m 0s
- Timeout : 10m 0s
- Minimum Pause Between Checkpoints : 5m 0s
- Maximum Concurrent Checkpoints : 1
- Persist Checkpoints Externally : Disabled

After the first checkpoint completed [see history after 1st ckpt.png], the job is restarted due to the ProducerFencedException [see exception after 1st ckpt.png].
The first checkpoint takes less than 2 minutes while my checkpoint interval is 15m and minimum pause between checkpoints is 5m.
After the job is restarted, the second checkpoint is triggered after a while [see history after 2nd ckpt.png] and this time I've got no exception.
The third checkpoint results in the same exception as after the first checkpoint.

Can anybody let me know what's going wrong behind the scene?

Best,

Dongwon

history after 3rd ckpt.png (337K) Download Attachment
exception after 3rd ckpt.png (268K) Download Attachment
history after 2nd ckpt.png (338K) Download Attachment
configuration.png (307K) Download Attachment
summary.png (296K) Download Attachment
exception after 1st ckpt.png (258K) Download Attachment
history after 1st ckpt.png (298K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Kafka ProducerFencedException after checkpointing

Piotr Nowojski
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka producer configuration (transaction.timeout.ms property) and Kafka broker configuration. The most likely cause of such error message is when Kafka's timeout is smaller then Flink’s checkpoint interval and transactions are not committed quickly enough before timeout occurring.

Piotrek

On 17 Mar 2018, at 07:24, Dongwon Kim <[hidden email]> wrote:


Hi,

I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 7th, ... checkpoints:
--
java.lang.RuntimeException: Error while confirming checkpoint
	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
--

FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing using Kafka sink.
We use FsStateBackend to store snapshot data on HDFS.

As shown in configuration.png, my checkpoint configuration is:
- Checkpointing Mode : Exactly Once
- Interval : 15m 0s
- Timeout : 10m 0s
- Minimum Pause Between Checkpoints : 5m 0s
- Maximum Concurrent Checkpoints : 1
- Persist Checkpoints Externally : Disabled

After the first checkpoint completed [see history after 1st ckpt.png], the job is restarted due to the ProducerFencedException [see exception after 1st ckpt.png].
The first checkpoint takes less than 2 minutes while my checkpoint interval is 15m and minimum pause between checkpoints is 5m.
After the job is restarted, the second checkpoint is triggered after a while [see history after 2nd ckpt.png] and this time I've got no exception.
The third checkpoint results in the same exception as after the first checkpoint.

Can anybody let me know what's going wrong behind the scene?

Best,

Dongwon
<history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd ckpt.png><configuration.png><summary.png><exception after 1st ckpt.png><history after 1st ckpt.png>

Reply | Threaded
Open this post in threaded view
|

Re: Kafka ProducerFencedException after checkpointing

Dongwon Kim-2
Hi Piotr,

We have set producer's [transaction.timeout.ms] to 15 minutes and have used the default setting for broker (15 mins).
As Flink's checkpoint interval is 15 minutes, it is not a situation where Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like transaction is not committed properly.

Best,

- Dongwon





On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka producer configuration (transaction.timeout.ms property) and Kafka broker configuration. The most likely cause of such error message is when Kafka's timeout is smaller then Flink’s checkpoint interval and transactions are not committed quickly enough before timeout occurring.

Piotrek

On 17 Mar 2018, at 07:24, Dongwon Kim <[hidden email]> wrote:


Hi,

I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 7th, ... checkpoints:
--
java.lang.RuntimeException: Error while confirming checkpoint
	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
--

FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing using Kafka sink.
We use FsStateBackend to store snapshot data on HDFS.

As shown in configuration.png, my checkpoint configuration is:
- Checkpointing Mode : Exactly Once
- Interval : 15m 0s
- Timeout : 10m 0s
- Minimum Pause Between Checkpoints : 5m 0s
- Maximum Concurrent Checkpoints : 1
- Persist Checkpoints Externally : Disabled

After the first checkpoint completed [see history after 1st ckpt.png], the job is restarted due to the ProducerFencedException [see exception after 1st ckpt.png].
The first checkpoint takes less than 2 minutes while my checkpoint interval is 15m and minimum pause between checkpoints is 5m.
After the job is restarted, the second checkpoint is triggered after a while [see history after 2nd ckpt.png] and this time I've got no exception.
The third checkpoint results in the same exception as after the first checkpoint.

Can anybody let me know what's going wrong behind the scene?

Best,

Dongwon
<history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd ckpt.png><configuration.png><summary.png><exception after 1st ckpt.png><history after 1st ckpt.png>


Reply | Threaded
Open this post in threaded view
|

Re: Kafka ProducerFencedException after checkpointing

Piotr Nowojski
Hi,

Please increase transaction.timeout.ms to a greater value or decrease Flink’s checkpoint interval, I’m pretty sure the issue here is that those two values are overlapping. I think that’s even visible on the screenshots. First checkpoint completed started at 14:28:48 and ended at 14:30:43, while the second one started at 14:45:53 and ended at 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 seconds, with maximal transaction duration of 21 minutes.

In HAPPY SCENARIO (without any failure and restarting), you should assume that your timeout interval should cover with some safety margin the period between start of a checkpoint and end of the NEXT checkpoint, since this is the upper bound how long the transaction might be used. In your case at least ~25 minutes.

On top of that, as described in the docs, https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance , in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly. 

Piotrek


On 20 Mar 2018, at 11:58, Dongwon Kim <[hidden email]> wrote:

Hi Piotr,

We have set producer's [transaction.timeout.ms] to 15 minutes and have used the default setting for broker (15 mins).
As Flink's checkpoint interval is 15 minutes, it is not a situation where Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like transaction is not committed properly.

Best,

- Dongwon





On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka producer configuration (transaction.timeout.ms property) and Kafka broker configuration. The most likely cause of such error message is when Kafka's timeout is smaller then Flink’s checkpoint interval and transactions are not committed quickly enough before timeout occurring.

Piotrek

On 17 Mar 2018, at 07:24, Dongwon Kim <[hidden email]> wrote:


Hi,

I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 7th, ... checkpoints:
--
java.lang.RuntimeException: Error while confirming checkpoint
	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
--

FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing using Kafka sink.
We use FsStateBackend to store snapshot data on HDFS.

As shown in configuration.png, my checkpoint configuration is:
- Checkpointing Mode : Exactly Once
- Interval : 15m 0s
- Timeout : 10m 0s
- Minimum Pause Between Checkpoints : 5m 0s
- Maximum Concurrent Checkpoints : 1
- Persist Checkpoints Externally : Disabled

After the first checkpoint completed [see history after 1st ckpt.png], the job is restarted due to the ProducerFencedException [see exception after 1st ckpt.png].
The first checkpoint takes less than 2 minutes while my checkpoint interval is 15m and minimum pause between checkpoints is 5m.
After the job is restarted, the second checkpoint is triggered after a while [see history after 2nd ckpt.png] and this time I've got no exception.
The third checkpoint results in the same exception as after the first checkpoint.

Can anybody let me know what's going wrong behind the scene?

Best,

Dongwon
<history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd ckpt.png><configuration.png><summary.png><exception after 1st ckpt.png><history after 1st ckpt.png>



Reply | Threaded
Open this post in threaded view
|

Re: Kafka ProducerFencedException after checkpointing

Dongwon Kim-2
Hi Piotr,

Now my streaming pipeline is working without retries. 
I decreased Flink's checkpoint interval from 15min to 10min as you suggested [see screenshot_10min_ckpt.png].

I though that producer's transaction timeout starts when the external transaction starts.
The truth is that Producer's transaction timeout starts after the last external checkpoint is committed.
Now that I have 15min for Producer's transaction timeout and 10min for Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, everything is working fine.
Am I right?

Anyway thank you very much for the detailed explanation!

Best,

Dongwon



On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

Please increase transaction.timeout.ms to a greater value or decrease Flink’s checkpoint interval, I’m pretty sure the issue here is that those two values are overlapping. I think that’s even visible on the screenshots. First checkpoint completed started at 14:28:48 and ended at 14:30:43, while the second one started at 14:45:53 and ended at 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 seconds, with maximal transaction duration of 21 minutes.

In HAPPY SCENARIO (without any failure and restarting), you should assume that your timeout interval should cover with some safety margin the period between start of a checkpoint and end of the NEXT checkpoint, since this is the upper bound how long the transaction might be used. In your case at least ~25 minutes.

On top of that, as described in the docs, https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance , in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly. 

Piotrek


On 20 Mar 2018, at 11:58, Dongwon Kim <[hidden email]> wrote:

Hi Piotr,

We have set producer's [transaction.timeout.ms] to 15 minutes and have used the default setting for broker (15 mins).
As Flink's checkpoint interval is 15 minutes, it is not a situation where Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like transaction is not committed properly.

Best,

- Dongwon





On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka producer configuration (transaction.timeout.ms property) and Kafka broker configuration. The most likely cause of such error message is when Kafka's timeout is smaller then Flink’s checkpoint interval and transactions are not committed quickly enough before timeout occurring.

Piotrek

On 17 Mar 2018, at 07:24, Dongwon Kim <[hidden email]> wrote:


Hi,

I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 7th, ... checkpoints:
--
java.lang.RuntimeException: Error while confirming checkpoint
	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
--

FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing using Kafka sink.
We use FsStateBackend to store snapshot data on HDFS.

As shown in configuration.png, my checkpoint configuration is:
- Checkpointing Mode : Exactly Once
- Interval : 15m 0s
- Timeout : 10m 0s
- Minimum Pause Between Checkpoints : 5m 0s
- Maximum Concurrent Checkpoints : 1
- Persist Checkpoints Externally : Disabled

After the first checkpoint completed [see history after 1st ckpt.png], the job is restarted due to the ProducerFencedException [see exception after 1st ckpt.png].
The first checkpoint takes less than 2 minutes while my checkpoint interval is 15m and minimum pause between checkpoints is 5m.
After the job is restarted, the second checkpoint is triggered after a while [see history after 2nd ckpt.png] and this time I've got no exception.
The third checkpoint results in the same exception as after the first checkpoint.

Can anybody let me know what's going wrong behind the scene?

Best,

Dongwon
<history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd ckpt.png><configuration.png><summary.png><exception after 1st ckpt.png><history after 1st ckpt.png>





screenshot_10min_ckpt.png (370K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Kafka ProducerFencedException after checkpointing

Piotr Nowojski
Hi,

But that’s exactly the case: producer’s transaction timeout starts when the external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka transaction for the whole period between checkpoints.

As I wrote in the previous message:

> in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly.

I think that 15 minutes timeout is a way too small value. If your job fails because of some intermittent failure (for example worker crash/restart), you will only have a couple of minutes for a successful Flink job restart. Otherwise you will lose some data (because of the transaction timeouts).

Piotrek

On 21 Mar 2018, at 10:30, Dongwon Kim <[hidden email]> wrote:

Hi Piotr,

Now my streaming pipeline is working without retries. 
I decreased Flink's checkpoint interval from 15min to 10min as you suggested [see screenshot_10min_ckpt.png].

I though that producer's transaction timeout starts when the external transaction starts.
The truth is that Producer's transaction timeout starts after the last external checkpoint is committed.
Now that I have 15min for Producer's transaction timeout and 10min for Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, everything is working fine.
Am I right?

Anyway thank you very much for the detailed explanation!

Best,

Dongwon



On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

Please increase transaction.timeout.ms to a greater value or decrease Flink’s checkpoint interval, I’m pretty sure the issue here is that those two values are overlapping. I think that’s even visible on the screenshots. First checkpoint completed started at 14:28:48 and ended at 14:30:43, while the second one started at 14:45:53 and ended at 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 seconds, with maximal transaction duration of 21 minutes.

In HAPPY SCENARIO (without any failure and restarting), you should assume that your timeout interval should cover with some safety margin the period between start of a checkpoint and end of the NEXT checkpoint, since this is the upper bound how long the transaction might be used. In your case at least ~25 minutes.

On top of that, as described in the docs, https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance , in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly. 

Piotrek


On 20 Mar 2018, at 11:58, Dongwon Kim <[hidden email]> wrote:

Hi Piotr,

We have set producer's [transaction.timeout.ms] to 15 minutes and have used the default setting for broker (15 mins).
As Flink's checkpoint interval is 15 minutes, it is not a situation where Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like transaction is not committed properly.

Best,

- Dongwon





On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka producer configuration (transaction.timeout.ms property) and Kafka broker configuration. The most likely cause of such error message is when Kafka's timeout is smaller then Flink’s checkpoint interval and transactions are not committed quickly enough before timeout occurring.

Piotrek

On 17 Mar 2018, at 07:24, Dongwon Kim <[hidden email]> wrote:


Hi,

I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 7th, ... checkpoints:
--
java.lang.RuntimeException: Error while confirming checkpoint
	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
--

FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing using Kafka sink.
We use FsStateBackend to store snapshot data on HDFS.

As shown in configuration.png, my checkpoint configuration is:
- Checkpointing Mode : Exactly Once
- Interval : 15m 0s
- Timeout : 10m 0s
- Minimum Pause Between Checkpoints : 5m 0s
- Maximum Concurrent Checkpoints : 1
- Persist Checkpoints Externally : Disabled

After the first checkpoint completed [see history after 1st ckpt.png], the job is restarted due to the ProducerFencedException [see exception after 1st ckpt.png].
The first checkpoint takes less than 2 minutes while my checkpoint interval is 15m and minimum pause between checkpoints is 5m.
After the job is restarted, the second checkpoint is triggered after a while [see history after 2nd ckpt.png] and this time I've got no exception.
The third checkpoint results in the same exception as after the first checkpoint.

Can anybody let me know what's going wrong behind the scene?

Best,

Dongwon
<history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd ckpt.png><configuration.png><summary.png><exception after 1st ckpt.png><history after 1st ckpt.png>




<screenshot_10min_ckpt.png>

Reply | Threaded
Open this post in threaded view
|

Re: Kafka ProducerFencedException after checkpointing

Tony Wei
Hi,

I had the same exception recently. I want to confirm that if it is due to transaction timeout,
then I will lose those data. Am I right? Can I make it fall back to at least once semantic in
this situation?

Best,
Tony Wei

Piotr Nowojski <[hidden email]> 於 2018年3月21日 週三 下午10:28寫道:
Hi,

But that’s exactly the case: producer’s transaction timeout starts when the external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka transaction for the whole period between checkpoints.

As I wrote in the previous message:

> in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly.

I think that 15 minutes timeout is a way too small value. If your job fails because of some intermittent failure (for example worker crash/restart), you will only have a couple of minutes for a successful Flink job restart. Otherwise you will lose some data (because of the transaction timeouts).

Piotrek

On 21 Mar 2018, at 10:30, Dongwon Kim <[hidden email]> wrote:

Hi Piotr,

Now my streaming pipeline is working without retries. 
I decreased Flink's checkpoint interval from 15min to 10min as you suggested [see screenshot_10min_ckpt.png].

I though that producer's transaction timeout starts when the external transaction starts.
The truth is that Producer's transaction timeout starts after the last external checkpoint is committed.
Now that I have 15min for Producer's transaction timeout and 10min for Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, everything is working fine.
Am I right?

Anyway thank you very much for the detailed explanation!

Best,

Dongwon



On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

Please increase transaction.timeout.ms to a greater value or decrease Flink’s checkpoint interval, I’m pretty sure the issue here is that those two values are overlapping. I think that’s even visible on the screenshots. First checkpoint completed started at 14:28:48 and ended at 14:30:43, while the second one started at 14:45:53 and ended at 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 seconds, with maximal transaction duration of 21 minutes.

In HAPPY SCENARIO (without any failure and restarting), you should assume that your timeout interval should cover with some safety margin the period between start of a checkpoint and end of the NEXT checkpoint, since this is the upper bound how long the transaction might be used. In your case at least ~25 minutes.

On top of that, as described in the docs, https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance , in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly. 

Piotrek


On 20 Mar 2018, at 11:58, Dongwon Kim <[hidden email]> wrote:

Hi Piotr,

We have set producer's [transaction.timeout.ms] to 15 minutes and have used the default setting for broker (15 mins).
As Flink's checkpoint interval is 15 minutes, it is not a situation where Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like transaction is not committed properly.

Best,

- Dongwon





On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka producer configuration (transaction.timeout.ms property) and Kafka broker configuration. The most likely cause of such error message is when Kafka's timeout is smaller then Flink’s checkpoint interval and transactions are not committed quickly enough before timeout occurring.

Piotrek

On 17 Mar 2018, at 07:24, Dongwon Kim <[hidden email]> wrote:


Hi,

I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 7th, ... checkpoints:
--
java.lang.RuntimeException: Error while confirming checkpoint
	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
--

FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing using Kafka sink.
We use FsStateBackend to store snapshot data on HDFS.

As shown in configuration.png, my checkpoint configuration is:
- Checkpointing Mode : Exactly Once
- Interval : 15m 0s
- Timeout : 10m 0s
- Minimum Pause Between Checkpoints : 5m 0s
- Maximum Concurrent Checkpoints : 1
- Persist Checkpoints Externally : Disabled

After the first checkpoint completed [see history after 1st ckpt.png], the job is restarted due to the ProducerFencedException [see exception after 1st ckpt.png].
The first checkpoint takes less than 2 minutes while my checkpoint interval is 15m and minimum pause between checkpoints is 5m.
After the job is restarted, the second checkpoint is triggered after a while [see history after 2nd ckpt.png] and this time I've got no exception.
The third checkpoint results in the same exception as after the first checkpoint.

Can anybody let me know what's going wrong behind the scene?

Best,

Dongwon
<history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd ckpt.png><configuration.png><summary.png><exception after 1st ckpt.png><history after 1st ckpt.png>




<screenshot_10min_ckpt.png>

Reply | Threaded
Open this post in threaded view
|

Re: Kafka ProducerFencedException after checkpointing

Piotr Nowojski
Hi,

Yes, if it’s due to transaction timeout you will lose the data.

Whether can you fallback to at least once, that depends on Kafka, not on Flink, since it’s the Kafka that timeouts those transactions and I don’t see in the documentation anything that could override this [1]. You might try disabling the mechanism via setting `transaction.abort.timed.out.transaction.cleanup.interval.ms` or `transaction.remove.expired.transaction.cleanup.interval.ms`, but that’s question more to Kafka guys. Maybe Becket could help with this.

Also it MIGHT be that Kafka doesn’t remove records from the topics when aborting the transaction and MAYBE you can still access them via “READ_UNCOMMITTED” mode. But that’s again, question to Kafka. 

Sorry that I can not help more.

If you do not care about exactly once, why don’t you just set the connector to at least once mode?

Piotrek

On 12 Aug 2019, at 06:29, Tony Wei <[hidden email]> wrote:

Hi,

I had the same exception recently. I want to confirm that if it is due to transaction timeout,
then I will lose those data. Am I right? Can I make it fall back to at least once semantic in
this situation?

Best,
Tony Wei

Piotr Nowojski <[hidden email]> 於 2018年3月21日 週三 下午10:28寫道:
Hi,

But that’s exactly the case: producer’s transaction timeout starts when the external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka transaction for the whole period between checkpoints.

As I wrote in the previous message:

> in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly.

I think that 15 minutes timeout is a way too small value. If your job fails because of some intermittent failure (for example worker crash/restart), you will only have a couple of minutes for a successful Flink job restart. Otherwise you will lose some data (because of the transaction timeouts).

Piotrek

On 21 Mar 2018, at 10:30, Dongwon Kim <[hidden email]> wrote:

Hi Piotr,

Now my streaming pipeline is working without retries. 
I decreased Flink's checkpoint interval from 15min to 10min as you suggested [see screenshot_10min_ckpt.png].

I though that producer's transaction timeout starts when the external transaction starts.
The truth is that Producer's transaction timeout starts after the last external checkpoint is committed.
Now that I have 15min for Producer's transaction timeout and 10min for Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, everything is working fine.
Am I right?

Anyway thank you very much for the detailed explanation!

Best,

Dongwon



On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

Please increase transaction.timeout.ms to a greater value or decrease Flink’s checkpoint interval, I’m pretty sure the issue here is that those two values are overlapping. I think that’s even visible on the screenshots. First checkpoint completed started at 14:28:48 and ended at 14:30:43, while the second one started at 14:45:53 and ended at 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 seconds, with maximal transaction duration of 21 minutes.

In HAPPY SCENARIO (without any failure and restarting), you should assume that your timeout interval should cover with some safety margin the period between start of a checkpoint and end of the NEXT checkpoint, since this is the upper bound how long the transaction might be used. In your case at least ~25 minutes.

On top of that, as described in the docs, https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance , in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly. 

Piotrek


On 20 Mar 2018, at 11:58, Dongwon Kim <[hidden email]> wrote:

Hi Piotr,

We have set producer's [transaction.timeout.ms] to 15 minutes and have used the default setting for broker (15 mins).
As Flink's checkpoint interval is 15 minutes, it is not a situation where Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like transaction is not committed properly.

Best,

- Dongwon





On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka producer configuration (transaction.timeout.ms property) and Kafka broker configuration. The most likely cause of such error message is when Kafka's timeout is smaller then Flink’s checkpoint interval and transactions are not committed quickly enough before timeout occurring.

Piotrek

On 17 Mar 2018, at 07:24, Dongwon Kim <[hidden email]> wrote:


Hi,

I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 7th, ... checkpoints:
--
java.lang.RuntimeException: Error while confirming checkpoint
	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
--

FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing using Kafka sink.
We use FsStateBackend to store snapshot data on HDFS.

As shown in configuration.png, my checkpoint configuration is:
- Checkpointing Mode : Exactly Once
- Interval : 15m 0s
- Timeout : 10m 0s
- Minimum Pause Between Checkpoints : 5m 0s
- Maximum Concurrent Checkpoints : 1
- Persist Checkpoints Externally : Disabled

After the first checkpoint completed [see history after 1st ckpt.png], the job is restarted due to the ProducerFencedException [see exception after 1st ckpt.png].
The first checkpoint takes less than 2 minutes while my checkpoint interval is 15m and minimum pause between checkpoints is 5m.
After the job is restarted, the second checkpoint is triggered after a while [see history after 2nd ckpt.png] and this time I've got no exception.
The third checkpoint results in the same exception as after the first checkpoint.

Can anybody let me know what's going wrong behind the scene?

Best,

Dongwon
<history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd ckpt.png><configuration.png><summary.png><exception after 1st ckpt.png><history after 1st ckpt.png>




<screenshot_10min_ckpt.png>


Reply | Threaded
Open this post in threaded view
|

Re: Kafka ProducerFencedException after checkpointing

Tony Wei
Hi Piotr,

Thanks a lot. I need exactly once in my use case, but instead of having the risk of losing data, at least once is more acceptable when error occurred.

Best,
Tony Wei

Piotr Nowojski <[hidden email]> 於 2019年8月12日 週一 下午3:27寫道:
Hi,

Yes, if it’s due to transaction timeout you will lose the data.

Whether can you fallback to at least once, that depends on Kafka, not on Flink, since it’s the Kafka that timeouts those transactions and I don’t see in the documentation anything that could override this [1]. You might try disabling the mechanism via setting `transaction.abort.timed.out.transaction.cleanup.interval.ms` or `transaction.remove.expired.transaction.cleanup.interval.ms`, but that’s question more to Kafka guys. Maybe Becket could help with this.

Also it MIGHT be that Kafka doesn’t remove records from the topics when aborting the transaction and MAYBE you can still access them via “READ_UNCOMMITTED” mode. But that’s again, question to Kafka. 

Sorry that I can not help more.

If you do not care about exactly once, why don’t you just set the connector to at least once mode?

Piotrek

On 12 Aug 2019, at 06:29, Tony Wei <[hidden email]> wrote:

Hi,

I had the same exception recently. I want to confirm that if it is due to transaction timeout,
then I will lose those data. Am I right? Can I make it fall back to at least once semantic in
this situation?

Best,
Tony Wei

Piotr Nowojski <[hidden email]> 於 2018年3月21日 週三 下午10:28寫道:
Hi,

But that’s exactly the case: producer’s transaction timeout starts when the external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka transaction for the whole period between checkpoints.

As I wrote in the previous message:

> in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly.

I think that 15 minutes timeout is a way too small value. If your job fails because of some intermittent failure (for example worker crash/restart), you will only have a couple of minutes for a successful Flink job restart. Otherwise you will lose some data (because of the transaction timeouts).

Piotrek

On 21 Mar 2018, at 10:30, Dongwon Kim <[hidden email]> wrote:

Hi Piotr,

Now my streaming pipeline is working without retries. 
I decreased Flink's checkpoint interval from 15min to 10min as you suggested [see screenshot_10min_ckpt.png].

I though that producer's transaction timeout starts when the external transaction starts.
The truth is that Producer's transaction timeout starts after the last external checkpoint is committed.
Now that I have 15min for Producer's transaction timeout and 10min for Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, everything is working fine.
Am I right?

Anyway thank you very much for the detailed explanation!

Best,

Dongwon



On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

Please increase transaction.timeout.ms to a greater value or decrease Flink’s checkpoint interval, I’m pretty sure the issue here is that those two values are overlapping. I think that’s even visible on the screenshots. First checkpoint completed started at 14:28:48 and ended at 14:30:43, while the second one started at 14:45:53 and ended at 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 seconds, with maximal transaction duration of 21 minutes.

In HAPPY SCENARIO (without any failure and restarting), you should assume that your timeout interval should cover with some safety margin the period between start of a checkpoint and end of the NEXT checkpoint, since this is the upper bound how long the transaction might be used. In your case at least ~25 minutes.

On top of that, as described in the docs, https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance , in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly. 

Piotrek


On 20 Mar 2018, at 11:58, Dongwon Kim <[hidden email]> wrote:

Hi Piotr,

We have set producer's [transaction.timeout.ms] to 15 minutes and have used the default setting for broker (15 mins).
As Flink's checkpoint interval is 15 minutes, it is not a situation where Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like transaction is not committed properly.

Best,

- Dongwon





On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka producer configuration (transaction.timeout.ms property) and Kafka broker configuration. The most likely cause of such error message is when Kafka's timeout is smaller then Flink’s checkpoint interval and transactions are not committed quickly enough before timeout occurring.

Piotrek

On 17 Mar 2018, at 07:24, Dongwon Kim <[hidden email]> wrote:


Hi,

I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 7th, ... checkpoints:
--
java.lang.RuntimeException: Error while confirming checkpoint
	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
--

FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing using Kafka sink.
We use FsStateBackend to store snapshot data on HDFS.

As shown in configuration.png, my checkpoint configuration is:
- Checkpointing Mode : Exactly Once
- Interval : 15m 0s
- Timeout : 10m 0s
- Minimum Pause Between Checkpoints : 5m 0s
- Maximum Concurrent Checkpoints : 1
- Persist Checkpoints Externally : Disabled

After the first checkpoint completed [see history after 1st ckpt.png], the job is restarted due to the ProducerFencedException [see exception after 1st ckpt.png].
The first checkpoint takes less than 2 minutes while my checkpoint interval is 15m and minimum pause between checkpoints is 5m.
After the job is restarted, the second checkpoint is triggered after a while [see history after 2nd ckpt.png] and this time I've got no exception.
The third checkpoint results in the same exception as after the first checkpoint.

Can anybody let me know what's going wrong behind the scene?

Best,

Dongwon
<history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd ckpt.png><configuration.png><summary.png><exception after 1st ckpt.png><history after 1st ckpt.png>




<screenshot_10min_ckpt.png>


Reply | Threaded
Open this post in threaded view
|

Re: Kafka ProducerFencedException after checkpointing

Piotr Nowojski
Hi,

Ok, I see. You can try to rewrite your logic (or maybe records schema by adding some ID fields) to manually deduplicating the records after processing them with at least once semantic. Such setup is usually simpler, with slightly better throughput and significantly better latency (end-to-end exactly once latency is limited by checkpointing time).

Piotrek

On 12 Aug 2019, at 11:12, Tony Wei <[hidden email]> wrote:

Hi Piotr,

Thanks a lot. I need exactly once in my use case, but instead of having the risk of losing data, at least once is more acceptable when error occurred.

Best,
Tony Wei

Piotr Nowojski <[hidden email]> 於 2019年8月12日 週一 下午3:27寫道:
Hi,

Yes, if it’s due to transaction timeout you will lose the data.

Whether can you fallback to at least once, that depends on Kafka, not on Flink, since it’s the Kafka that timeouts those transactions and I don’t see in the documentation anything that could override this [1]. You might try disabling the mechanism via setting `transaction.abort.timed.out.transaction.cleanup.interval.ms` or `transaction.remove.expired.transaction.cleanup.interval.ms`, but that’s question more to Kafka guys. Maybe Becket could help with this.

Also it MIGHT be that Kafka doesn’t remove records from the topics when aborting the transaction and MAYBE you can still access them via “READ_UNCOMMITTED” mode. But that’s again, question to Kafka. 

Sorry that I can not help more.

If you do not care about exactly once, why don’t you just set the connector to at least once mode?

Piotrek

On 12 Aug 2019, at 06:29, Tony Wei <[hidden email]> wrote:

Hi,

I had the same exception recently. I want to confirm that if it is due to transaction timeout,
then I will lose those data. Am I right? Can I make it fall back to at least once semantic in
this situation?

Best,
Tony Wei

Piotr Nowojski <[hidden email]> 於 2018年3月21日 週三 下午10:28寫道:
Hi,

But that’s exactly the case: producer’s transaction timeout starts when the external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka transaction for the whole period between checkpoints.

As I wrote in the previous message:

> in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly.

I think that 15 minutes timeout is a way too small value. If your job fails because of some intermittent failure (for example worker crash/restart), you will only have a couple of minutes for a successful Flink job restart. Otherwise you will lose some data (because of the transaction timeouts).

Piotrek

On 21 Mar 2018, at 10:30, Dongwon Kim <[hidden email]> wrote:

Hi Piotr,

Now my streaming pipeline is working without retries. 
I decreased Flink's checkpoint interval from 15min to 10min as you suggested [see screenshot_10min_ckpt.png].

I though that producer's transaction timeout starts when the external transaction starts.
The truth is that Producer's transaction timeout starts after the last external checkpoint is committed.
Now that I have 15min for Producer's transaction timeout and 10min for Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, everything is working fine.
Am I right?

Anyway thank you very much for the detailed explanation!

Best,

Dongwon



On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

Please increase transaction.timeout.ms to a greater value or decrease Flink’s checkpoint interval, I’m pretty sure the issue here is that those two values are overlapping. I think that’s even visible on the screenshots. First checkpoint completed started at 14:28:48 and ended at 14:30:43, while the second one started at 14:45:53 and ended at 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 seconds, with maximal transaction duration of 21 minutes.

In HAPPY SCENARIO (without any failure and restarting), you should assume that your timeout interval should cover with some safety margin the period between start of a checkpoint and end of the NEXT checkpoint, since this is the upper bound how long the transaction might be used. In your case at least ~25 minutes.

On top of that, as described in the docs, https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance , in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly. 

Piotrek


On 20 Mar 2018, at 11:58, Dongwon Kim <[hidden email]> wrote:

Hi Piotr,

We have set producer's [transaction.timeout.ms] to 15 minutes and have used the default setting for broker (15 mins).
As Flink's checkpoint interval is 15 minutes, it is not a situation where Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like transaction is not committed properly.

Best,

- Dongwon





On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka producer configuration (transaction.timeout.ms property) and Kafka broker configuration. The most likely cause of such error message is when Kafka's timeout is smaller then Flink’s checkpoint interval and transactions are not committed quickly enough before timeout occurring.

Piotrek

On 17 Mar 2018, at 07:24, Dongwon Kim <[hidden email]> wrote:


Hi,

I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 7th, ... checkpoints:
--
java.lang.RuntimeException: Error while confirming checkpoint
	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
--

FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing using Kafka sink.
We use FsStateBackend to store snapshot data on HDFS.

As shown in configuration.png, my checkpoint configuration is:
- Checkpointing Mode : Exactly Once
- Interval : 15m 0s
- Timeout : 10m 0s
- Minimum Pause Between Checkpoints : 5m 0s
- Maximum Concurrent Checkpoints : 1
- Persist Checkpoints Externally : Disabled

After the first checkpoint completed [see history after 1st ckpt.png], the job is restarted due to the ProducerFencedException [see exception after 1st ckpt.png].
The first checkpoint takes less than 2 minutes while my checkpoint interval is 15m and minimum pause between checkpoints is 5m.
After the job is restarted, the second checkpoint is triggered after a while [see history after 2nd ckpt.png] and this time I've got no exception.
The third checkpoint results in the same exception as after the first checkpoint.

Can anybody let me know what's going wrong behind the scene?

Best,

Dongwon
<history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd ckpt.png><configuration.png><summary.png><exception after 1st ckpt.png><history after 1st ckpt.png>




<screenshot_10min_ckpt.png>



Reply | Threaded
Open this post in threaded view
|

Re: Kafka ProducerFencedException after checkpointing

Matthias J. Sax-2
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

`transaction.timeout.ms` is a producer setting, thus you can increase
it accordingly.

Note, that brokers bound the range via `transaction.max.timeout.ms`;
thus, you may need to increase this broker configs, too.


- -Matthias

On 8/12/19 2:43 AM, Piotr Nowojski wrote:

> Hi,
>
> Ok, I see. You can try to rewrite your logic (or maybe records
> schema by adding some ID fields) to manually deduplicating the
> records after processing them with at least once semantic. Such
> setup is usually simpler, with slightly better throughput and
> significantly better latency (end-to-end exactly once latency is
> limited by checkpointing time).
>
> Piotrek
>
>> On 12 Aug 2019, at 11:12, Tony Wei <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>> Hi Piotr,
>>
>> Thanks a lot. I need exactly once in my use case, but instead of
>> having the risk of losing data, at least once is more acceptable
>> when error occurred.
>>
>> Best, Tony Wei
>>
>> Piotr Nowojski <[hidden email]
>> <mailto:[hidden email]>> 於 2019年8月12日 週一 下午3:27寫道:
>>
>> Hi,
>>
>> Yes, if it’s due to transaction timeout you will lose the data.
>>
>> Whether can you fallback to at least once, that depends on
>> Kafka, not on Flink, since it’s the Kafka that timeouts those
>> transactions and I don’t see in the documentation anything that
>> could override this [1]. You might try disabling the mechanism
>> via setting
>> `transaction.abort.timed.out.transaction.cleanup.interval.ms
>> <http://transaction.abort.timed.out.transaction.cleanup.interval.ms/>
`
>>
>>
or `transaction.remove.expired.transaction.cleanup.interval.ms
>> <http://transaction.remove.expired.transaction.cleanup.interval.ms/>`
,
>>
>>
but that’s question more to Kafka guys. Maybe Becket could help

>> with this.
>>
>> Also it MIGHT be that Kafka doesn’t remove records from the
>> topics when aborting the transaction and MAYBE you can still
>> access them via “READ_UNCOMMITTED” mode. But that’s again,
>> question to Kafka.
>>
>> Sorry that I can not help more.
>>
>> If you do not care about exactly once, why don’t you just set
>> the connector to at least once mode?
>>
>> Piotrek
>>
>>> On 12 Aug 2019, at 06:29, Tony Wei <[hidden email]
>>> <mailto:[hidden email]>> wrote:
>>>
>>> Hi,
>>>
>>> I had the same exception recently. I want to confirm that if
>>> it is due to transaction timeout, then I will lose those data.
>>> Am I right? Can I make it fall back to at least once semantic
>>> in this situation?
>>>
>>> Best, Tony Wei
>>>
>>> Piotr Nowojski <[hidden email]
>>> <mailto:[hidden email]>> 於 2018年3月21日 週三 下午10:28 寫道:
>>>
>>> Hi,
>>>
>>> But that’s exactly the case: producer’s transaction timeout
>>> starts when the external transaction starts - but
>>> FlinkKafkaProducer011 keeps an active Kafka transaction for the
>>> whole period between checkpoints.
>>>
>>> As I wrote in the previous message:
>>>
>>>> in case of failure, your timeout must also be able to cover
>>> the additional downtime required for the successful job
>>> restart. Thus you should increase your timeout accordingly.
>>>
>>> I think that 15 minutes timeout is a way too small value. If
>>> your job fails because of some intermittent failure (for
>>> example worker crash/restart), you will only have a couple of
>>> minutes for a successful Flink job restart. Otherwise you will
>>> lose some data (because of the transaction timeouts).
>>>
>>> Piotrek
>>>
>>>> On 21 Mar 2018, at 10:30, Dongwon Kim <[hidden email]
>>>> <mailto:[hidden email]>> wrote:
>>>>
>>>> Hi Piotr,
>>>>
>>>> Now my streaming pipeline is working without retries. I
>>>> decreased Flink's checkpoint interval from 15min to 10min as
>>>> you suggested [see screenshot_10min_ckpt.png].
>>>>
>>>> I though that producer's transaction timeout starts when the
>>>> external transaction starts. The truth is that Producer's
>>>> transaction timeout starts after the last external checkpoint
>>>> is committed. Now that I have 15min for Producer's
>>>> transaction timeout and 10min for Flink's checkpoint
>>>> interval, and every checkpoint takes less than 5 minutes,
>>>> everything is working fine. Am I right?
>>>>
>>>> Anyway thank you very much for the detailed explanation!
>>>>
>>>> Best,
>>>>
>>>> Dongwon
>>>>
>>>>
>>>>
>>>> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski
>>>> <[hidden email] <mailto:[hidden email]>>
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> Please increase transaction.timeout.ms
>>>> <http://transaction.timeout.ms/> to a greater value or
>>>> decrease Flink’s checkpoint interval, I’m pretty sure the
>>>> issue here is that those two values are overlapping. I think
>>>> that’s even visible on the screenshots. First checkpoint
>>>> completed started at 14:28:48 and ended at 14:30:43, while
>>>> the second one started at 14:45:53 and ended at 14:49:16.
>>>> That gives you minimal transaction duration of 15 minutes and
>>>> 10 seconds, with maximal transaction duration of 21 minutes.
>>>>
>>>> In HAPPY SCENARIO (without any failure and restarting), you
>>>> should assume that your timeout interval should cover with
>>>> some safety margin the period between start of a checkpoint
>>>> and end of the NEXT checkpoint, since this is the upper bound
>>>> how long the transaction might be used. In your case at least
>>>> ~25 minutes.
>>>>
>>>> On top of that, as described in the docs,
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/con
nectors/kafka.html#kafka-producers-and-fault-tolerance

>>>> , in case of failure, your timeout must also be able to cover
>>>> the additional downtime required for the successful job
>>>> restart. Thus you should increase your timeout accordingly.
>>>>
>>>> Piotrek
>>>>
>>>>
>>>>> On 20 Mar 2018, at 11:58, Dongwon Kim
>>>>> <[hidden email] <mailto:[hidden email]>>
>>>>> wrote:
>>>>>
>>>>> Hi Piotr,
>>>>>
>>>>> We have set producer's [transaction.timeout.ms
>>>>> <http://transaction.timeout.ms/>] to 15 minutes and have
>>>>> used the default setting for broker (15 mins). As Flink's
>>>>> checkpoint interval is 15 minutes, it is not a situation
>>>>> where Kafka's timeout is smaller than Flink's checkpoint
>>>>> interval. As our first checkpoint just takes 2 minutes, it
>>>>> seems like transaction is not committed properly.
>>>>>
>>>>> Best,
>>>>>
>>>>> - Dongwon
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski
>>>>> <[hidden email] <mailto:[hidden email]>>
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> What’s your Kafka’s transaction timeout setting? Please
>>>>> both check Kafka producer configuration
>>>>> (transaction.timeout.ms <http://transaction.timeout.ms/>
>>>>> property) and Kafka broker configuration. The most likely
>>>>> cause of such error message is when Kafka's timeout is
>>>>> smaller then Flink’s checkpoint interval and transactions
>>>>> are not committed quickly enough before timeout occurring.
>>>>>
>>>>> Piotrek
>>>>>
>>>>>> On 17 Mar 2018, at 07:24, Dongwon Kim
>>>>>> <[hidden email] <mailto:[hidden email]>>
>>>>>> wrote:
>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm faced with the following ProducerFencedException
>>>>>> after 1st, 3rd, 5th, 7th, ... checkpoints: --
>>>>>> java.lang.RuntimeException: Error while confirming
>>>>>> checkpoint at
>>>>>> org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>>>>>
>>>>>>
at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.jav
a:511)
>>>>>>
>>>>>>
at
>>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu
tor.java:1149)
>>>>>>
>>>>>>
at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExec
utor.java:624)
>>>>>>
>>>>>>
at java.lang.Thread.run(Thread.java:748) Caused

>>>>>> by:
>>>>>> org.apache.kafka.common.errors.ProducerFencedException:
>>>>>> Producer attempted an operation with an old epoch. Either
>>>>>> there is a newer producer with the same transactionalId,
>>>>>> or the producer's transaction has been expired by the
>>>>>> broker. --
>>>>>>
>>>>>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly
>>>>>> once processing using Kafka sink. We use FsStateBackend
>>>>>> to store snapshot data on HDFS.
>>>>>>
>>>>>> As shown in configuration.png, my checkpoint
>>>>>> configuration is: - Checkpointing Mode : Exactly Once -
>>>>>> Interval : 15m 0s - Timeout : 10m 0s - Minimum Pause
>>>>>> Between Checkpoints : 5m 0s - Maximum Concurrent
>>>>>> Checkpoints : 1 - Persist Checkpoints Externally :
>>>>>> Disabled
>>>>>>
>>>>>> After the first checkpoint completed [see history after
>>>>>> 1st ckpt.png], the job is restarted due to the
>>>>>> ProducerFencedException [see exception after 1st
>>>>>> ckpt.png]. The first checkpoint takes less than 2
>>>>>> minutes while my checkpoint interval is 15m and minimum
>>>>>> pause between checkpoints is 5m. After the job is
>>>>>> restarted, the second checkpoint is triggered after a
>>>>>> while [see history after 2nd ckpt.png] and this time I've
>>>>>> got no exception. The third checkpoint results in the
>>>>>> same exception as after the first checkpoint.
>>>>>>
>>>>>> Can anybody let me know what's going wrong behind the
>>>>>> scene?
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dongwon <history after 3rd ckpt.png><exception after 3rd
>>>>>> ckpt.png><history after 2nd
>>>>>> ckpt.png><configuration.png><summary.png><exception
>>>>>> after 1st ckpt.png><history after 1st ckpt.png>
>>>>>
>>>>>
>>>>
>>>>
>>>> <screenshot_10min_ckpt.png>
>>>
>>
>
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIzBAEBCgAdFiEE8rTF/+V8YmXHQb9YY43Spj86DwAFAl1Rj6QACgkQY43Spj86
DwDaXhAAsghkoeh4DP1XD/Oty+1TkgD29+I96izx+JtKYhh1PSwbm+5LUBqFeGvk
N8mMJjyNhHNbOFbqkvjIwKpV3IAXRUCugDHtGWzbdVjpDFG/O5e11LydYtNg+VGA
VueQm/dV/W6EyrPjpmpXFCKrGDoVFcJ7JV9T4nAmkdML5BjNJUDAdK+ipsePEnZI
q2htM5NDE29Pl242uqnelHCgje0kXzpaPn20V8xnKTeEU/OmHcZVbDl/u1caOWE2
PnbeMuU06QfwdZ/2u7yVl2PH4l97YFB89h8W6HuDkbgvSAVtnG1OA6cMUNjyYOKl
gfRW2NE9YRnaaP1c+UIcnT14DlWbGzwl7DXy4213jxo9Dhjm08IjKNTQAqutWoeZ
kEto+k7h1bmP6W5prk4r6YNOEBzZoLWhSPhFl6un8NZetCys6E66ShXVSMFKzkDO
06mj9g0BDZI0uEggfFZ054I5mpMLGGWCIDjFm6i3LdpsDMpdY9izpmipEvKXfPzL
mBfPVddzz9DrvQmGPEs8KDa8nI+WvvJj2lIhtOY7uOX5fzlMxNZDW0ST0FQC/R0x
y4mssLVwa0OdPYFnXvyqsDd2KxAsG77K6D4N5rSMApHPHoSqdIgNmyuikGYGzmzu
w7ZdFKxrHiIHXuwthIEahfipXJ2nynAKi06k08z8HeQoxGkBt84=
=FqSe
-----END PGP SIGNATURE-----