Duplicate record writes to sink after job failure

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Duplicate record writes to sink after job failure

cslotterback

We are running a Flink job that uses FlinkKafkaProducer09 as a sink with consumer checkpointing enabled. When our job runs into communication issues with our kafka cluster and throws an exception after the configured retries, our job restarts but we want to ensure at least once processing so we have setLogFailureOnly set to false, resulting in duplicate records from the last checkpoint to the exception after the job recovers and reconnects successfully.

 

We may not have the option to upgrade to the FlinkKafkaConsumer011 consumer, as our kafka endpoint is external. Are there any known ways to avoid or mitigate duplicates on the older versions of FlinkKafkaProducer while still ensuring at least once message processing?

 

Reply | Threaded
Open this post in threaded view
|

Re: Duplicate record writes to sink after job failure

Andrey Zagrebin-2
Hi Chris,

there is no way to provide "exactly-once" and avoid duplicates without transactions available since Kafka 0.11. 
The only way I could think of is building a custom deduplication step on consumer side. 
E.g. using in memory cache with eviction or some other temporary storage to keep set of processed message ids. This approach might also give consistency only to some extent.

Best,
Andrey

On Mon, Jan 14, 2019 at 9:03 PM Slotterback, Chris <[hidden email]> wrote:

We are running a Flink job that uses FlinkKafkaProducer09 as a sink with consumer checkpointing enabled. When our job runs into communication issues with our kafka cluster and throws an exception after the configured retries, our job restarts but we want to ensure at least once processing so we have setLogFailureOnly set to false, resulting in duplicate records from the last checkpoint to the exception after the job recovers and reconnects successfully.

 

We may not have the option to upgrade to the FlinkKafkaConsumer011 consumer, as our kafka endpoint is external. Are there any known ways to avoid or mitigate duplicates on the older versions of FlinkKafkaProducer while still ensuring at least once message processing?