Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation

Nastaran

Hi,
I have a flink streaming job implemented via java which reads some messages from a kafka topic, transforms them and finally sends them to another kafka topic.
The version of flink is 1.6.2 and the kafka version is 011. I pass the Semantic.EXACTLY_ONCE parameter to the producer. The problem is that when I cancel the job with savepoint and then restart it using the saved savepoint, I have duplicated messages in the sink.
Do I miss some kafka/flink configurations to avoid duplication?


Kind regards,

Nastaran Motavalli



Reply | Threaded
Open this post in threaded view
|

Re: Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation

Piotr Nowojski
Hi Nastaran,

When you are checking for duplicated messages, are you reading from kafka using `read_commited` mode (this is not the default value)?


> Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once semantic. Whenever you write to Kafka using
> transactions, do not forget about setting desired isolation.level (read_committed or read_uncommitted - the latter one is the
> default value) for any application consuming records from Kafka.

Does the problem happens always?

Piotrek

On 28 Nov 2018, at 08:56, Nastaran Motavali <[hidden email]> wrote:

Hi,
I have a flink streaming job implemented via java which reads some messages from a kafka topic, transforms them and finally sends them to another kafka topic.
The version of flink is 1.6.2 and the kafka version is 011. I pass the Semantic.EXACTLY_ONCE parameter to the producer. The problem is that when I cancel the job with savepoint and then restart it using the saved savepoint, I have duplicated messages in the sink.
Do I miss some kafka/flink configurations to avoid duplication?


Kind regards,
Nastaran Motavalli

Reply | Threaded
Open this post in threaded view
|

Re: Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation

Nastaran

Thanks for your helpful response,
Setting the consumer's 'isolation.level' property to 'read_committed' solved the problem!
In fact, still there is some duplicated messages in the sink topic but they are uncommitted and if a kafka consumer reads the messages from this sink, the duplicated messages have not been read so everything is OK.



Kind regards,

Nastaran Motavalli




From: Piotr Nowojski <[hidden email]>
Sent: Thursday, November 29, 2018 3:38:38 PM
To: Nastaran Motavali
Cc: [hidden email]
Subject: Re: Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation
 
Hi Nastaran,

When you are checking for duplicated messages, are you reading from kafka using `read_commited` mode (this is not the default value)?


> Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once semantic. Whenever you write to Kafka using
> transactions, do not forget about setting desired isolation.level (read_committed or read_uncommitted - the latter one is the
> default value) for any application consuming records from Kafka.

Does the problem happens always?

Piotrek

On 28 Nov 2018, at 08:56, Nastaran Motavali <[hidden email]> wrote:

Hi,
I have a flink streaming job implemented via java which reads some messages from a kafka topic, transforms them and finally sends them to another kafka topic.
The version of flink is 1.6.2 and the kafka version is 011. I pass the Semantic.EXACTLY_ONCE parameter to the producer. The problem is that when I cancel the job with savepoint and then restart it using the saved savepoint, I have duplicated messages in the sink.
Do I miss some kafka/flink configurations to avoid duplication?


Kind regards,
Nastaran Motavalli

Reply | Threaded
Open this post in threaded view
|

Re: Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation

Piotr Nowojski
Good to hear that :)

Duplicated “uncommitted” messages are normal and to be expected. After all that’s what `read_uncommitted` is for - to be able to read the messages without waiting until they are committed and thus even if their transactions was later aborted.

Piotrek

On 1 Dec 2018, at 14:44, Nastaran Motavali <[hidden email]> wrote:

Thanks for your helpful response,
Setting the consumer's 'isolation.level' property to 'read_committed' solved the problem!
In fact, still there is some duplicated messages in the sink topic but they are uncommitted and if a kafka consumer reads the messages from this sink, the duplicated messages have not been read so everything is OK.



Kind regards,
Nastaran Motavalli



From: Piotr Nowojski <[hidden email]>
Sent: Thursday, November 29, 2018 3:38:38 PM
To: Nastaran Motavali
Cc: [hidden email]
Subject: Re: Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation
 
Hi Nastaran,

When you are checking for duplicated messages, are you reading from kafka using `read_commited` mode (this is not the default value)?


> Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once semantic. Whenever you write to Kafka using
> transactions, do not forget about setting desired isolation.level (read_committed or read_uncommitted - the latter one is the
> default value) for any application consuming records from Kafka.

Does the problem happens always?

Piotrek

On 28 Nov 2018, at 08:56, Nastaran Motavali <[hidden email]> wrote:

Hi,
I have a flink streaming job implemented via java which reads some messages from a kafka topic, transforms them and finally sends them to another kafka topic.
The version of flink is 1.6.2 and the kafka version is 011. I pass the Semantic.EXACTLY_ONCE parameter to the producer. The problem is that when I cancel the job with savepoint and then restart it using the saved savepoint, I have duplicated messages in the sink.
Do I miss some kafka/flink configurations to avoid duplication?


Kind regards,
Nastaran Motavalli