Flink KafkaProducer flushing on savepoints

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink KafkaProducer flushing on savepoints

Witzany, Tomas
Hi,
I have a question about the at-least-once guarantees for Kafka producers when checkpointing is disabled. In our data pipeline we have a Flink job on an unlimited stream that originally, we had checkpoints turned on. Further this job is cancelled with a savepoint once a day to do some data pre and post-processing for the next day, afterwards this job is restarted from the savepoint.

The issue we have is that we want to turn off checkpointing, since it does not give us much value and only creates extra IO. When this is done this message shows up:
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing."
This prompted us to investigate, and it seems that if you have checkpointing disabled, there are no at-least-once guarantees. 

What about if you have no checkpointing, but you make savepoints that you restore from yourself? Savepoints are the same thing as checkpoints in the code. The flink producer makes it impossible to turn on flushing and have checkpointing disabled. I can see why this is the case as there is some extra synchronization overhead related to the flushing flag being on. Is there a way to have checkpointing disabled and have at least once guarantees on savepoints?

The only thing I can think about is have checkpoints enabled with some very high periodicity so that they are never(almost) triggered. But this is a hack.

Tomas Witzany
Reply | Threaded
Open this post in threaded view
|

Re: Flink KafkaProducer flushing on savepoints

Piotr Nowojski-4
Hi,

What Flink version and which FlinkKafkaProducer version are you using?  `FlinkKafkaProducerBase` is no longer used in the latest version. I would guess some older versions, and FlinkKafkaProducer010 or later (no longer supported).

I would suggest either to use the universal FlinkKafkaProducer (universal), or FliknKafkaProducer011 (if you are using a really old Flink version that doesn't have the universal Kafka connector). Both of those should work with any Kafka version and by looking at the code it seems to me like neither of those has the problem you mentioned. If you select `org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic#AT_LEAST_ONCE` and disable checkpointing it should be still flushing records on savepoints.

> The only thing I can think about is have checkpoints enabled with some very high periodicity so that they are never(almost) triggered. But this is a hack.

Yes, it would be a hack. But it would work.

Best,
Piotrek

wt., 2 mar 2021 o 12:09 Witzany, Tomas <[hidden email]> napisał(a):
Hi,
I have a question about the at-least-once guarantees for Kafka producers when checkpointing is disabled. In our data pipeline we have a Flink job on an unlimited stream that originally, we had checkpoints turned on. Further this job is cancelled with a savepoint once a day to do some data pre and post-processing for the next day, afterwards this job is restarted from the savepoint.

The issue we have is that we want to turn off checkpointing, since it does not give us much value and only creates extra IO. When this is done this message shows up:
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing."
This prompted us to investigate, and it seems that if you have checkpointing disabled, there are no at-least-once guarantees. 

What about if you have no checkpointing, but you make savepoints that you restore from yourself? Savepoints are the same thing as checkpoints in the code. The flink producer makes it impossible to turn on flushing and have checkpointing disabled. I can see why this is the case as there is some extra synchronization overhead related to the flushing flag being on. Is there a way to have checkpointing disabled and have at least once guarantees on savepoints?

The only thing I can think about is have checkpoints enabled with some very high periodicity so that they are never(almost) triggered. But this is a hack.

Tomas Witzany
Reply | Threaded
Open this post in threaded view
|

Re: Flink KafkaProducer flushing on savepoints

Witzany, Tomas
Hi,
thanks for your answer. It seems like it will not be possible for me to upgrade to the newer universal Flink producer, because of an older Kafka version I am reading from. So unfortunately for now I will have to go with the hack.
Thanks

From: Piotr Nowojski <[hidden email]>
Sent: 03 March 2021 21:10
To: Witzany, Tomas <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Flink KafkaProducer flushing on savepoints
 
Hi,

What Flink version and which FlinkKafkaProducer version are you using?  `FlinkKafkaProducerBase` is no longer used in the latest version. I would guess some older versions, and FlinkKafkaProducer010 or later (no longer supported).

I would suggest either to use the universal FlinkKafkaProducer (universal), or FliknKafkaProducer011 (if you are using a really old Flink version that doesn't have the universal Kafka connector). Both of those should work with any Kafka version and by looking at the code it seems to me like neither of those has the problem you mentioned. If you select `org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic#AT_LEAST_ONCE` and disable checkpointing it should be still flushing records on savepoints.

> The only thing I can think about is have checkpoints enabled with some very high periodicity so that they are never(almost) triggered. But this is a hack.

Yes, it would be a hack. But it would work.

Best,
Piotrek

wt., 2 mar 2021 o 12:09 Witzany, Tomas <[hidden email]> napisał(a):
Hi,
I have a question about the at-least-once guarantees for Kafka producers when checkpointing is disabled. In our data pipeline we have a Flink job on an unlimited stream that originally, we had checkpoints turned on. Further this job is cancelled with a savepoint once a day to do some data pre and post-processing for the next day, afterwards this job is restarted from the savepoint.

The issue we have is that we want to turn off checkpointing, since it does not give us much value and only creates extra IO. When this is done this message shows up:
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing."
This prompted us to investigate, and it seems that if you have checkpointing disabled, there are no at-least-once guarantees. 

What about if you have no checkpointing, but you make savepoints that you restore from yourself? Savepoints are the same thing as checkpoints in the code. The flink producer makes it impossible to turn on flushing and have checkpointing disabled. I can see why this is the case as there is some extra synchronization overhead related to the flushing flag being on. Is there a way to have checkpointing disabled and have at least once guarantees on savepoints?

The only thing I can think about is have checkpoints enabled with some very high periodicity so that they are never(almost) triggered. But this is a hack.

Tomas Witzany
Reply | Threaded
Open this post in threaded view
|

Re: Flink KafkaProducer flushing on savepoints

Piotr Nowojski-4
Yes, that might be an issue. As far as I remember, the universal connector works with Kafka 0.10.x or higher.

Piotrek

pt., 5 mar 2021 o 11:20 Witzany, Tomas <[hidden email]> napisał(a):
Hi,
thanks for your answer. It seems like it will not be possible for me to upgrade to the newer universal Flink producer, because of an older Kafka version I am reading from. So unfortunately for now I will have to go with the hack.
Thanks

From: Piotr Nowojski <[hidden email]>
Sent: 03 March 2021 21:10
To: Witzany, Tomas <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Flink KafkaProducer flushing on savepoints
 
Hi,

What Flink version and which FlinkKafkaProducer version are you using?  `FlinkKafkaProducerBase` is no longer used in the latest version. I would guess some older versions, and FlinkKafkaProducer010 or later (no longer supported).

I would suggest either to use the universal FlinkKafkaProducer (universal), or FliknKafkaProducer011 (if you are using a really old Flink version that doesn't have the universal Kafka connector). Both of those should work with any Kafka version and by looking at the code it seems to me like neither of those has the problem you mentioned. If you select `org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic#AT_LEAST_ONCE` and disable checkpointing it should be still flushing records on savepoints.

> The only thing I can think about is have checkpoints enabled with some very high periodicity so that they are never(almost) triggered. But this is a hack.

Yes, it would be a hack. But it would work.

Best,
Piotrek

wt., 2 mar 2021 o 12:09 Witzany, Tomas <[hidden email]> napisał(a):
Hi,
I have a question about the at-least-once guarantees for Kafka producers when checkpointing is disabled. In our data pipeline we have a Flink job on an unlimited stream that originally, we had checkpoints turned on. Further this job is cancelled with a savepoint once a day to do some data pre and post-processing for the next day, afterwards this job is restarted from the savepoint.

The issue we have is that we want to turn off checkpointing, since it does not give us much value and only creates extra IO. When this is done this message shows up:
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing."
This prompted us to investigate, and it seems that if you have checkpointing disabled, there are no at-least-once guarantees. 

What about if you have no checkpointing, but you make savepoints that you restore from yourself? Savepoints are the same thing as checkpoints in the code. The flink producer makes it impossible to turn on flushing and have checkpointing disabled. I can see why this is the case as there is some extra synchronization overhead related to the flushing flag being on. Is there a way to have checkpointing disabled and have at least once guarantees on savepoints?

The only thing I can think about is have checkpoints enabled with some very high periodicity so that they are never(almost) triggered. But this is a hack.

Tomas Witzany