getting duplicate messages from duplicate jobs

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

getting duplicate messages from duplicate jobs

avilevi
Hi, 
This quite confusing. 
I submitted the same stateless job twice (actually I upload it once). However when I place a message on kafka, it seems that both jobs consumes it, and publish the same result (we publish the result to other kafka topic, so I actually see the massage duplicated on kafka ). how can it be ? both jobs are using the same group id (group id is fixed and not generated )

Kind regards
Avi
Reply | Threaded
Open this post in threaded view
|

Re: getting duplicate messages from duplicate jobs

Dawid Wysakowicz-2
Hi Avi,

AFAIK Flink's Kafka consumer uses low level Kafka APIs and do not
participate in partition assignment protocol from Kafka, but it
discovers all available partitions for given topic and manages offsets
itself, what allows to provide exactly-once guarantees with regards to
Flink's internal state.

Flink's Kafka consumer uses the group.id to derive starting offsets for
partitions it can also commit back offsets to kafka for monitoring
purposes[1]. But as I said it does not participate in partition
assignment within a group, so it might happen that the same partition
will be read by multiple consumers with the same group.id.

I'm adding Gordon as a cc to correct me if I am wrong.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

On 23/01/2019 18:02, Avi Levi wrote:

> Hi, 
> This quite confusing. 
> I submitted the same stateless job twice (actually I upload it once).
> However when I place a message on kafka, it seems that both jobs
> consumes it, and publish the same result (we publish the result to
> other kafka topic, so I actually see the massage duplicated on kafka
> ). how can it be ? both jobs are using the same group id (group id is
> fixed and not generated )
>
> Kind regards
> Avi


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: getting duplicate messages from duplicate jobs

Dawid Wysakowicz-2
In reply to this post by avilevi
Forgot to cc Gordon :)

On 23/01/2019 18:02, Avi Levi wrote:

> Hi, 
> This quite confusing. 
> I submitted the same stateless job twice (actually I upload it once).
> However when I place a message on kafka, it seems that both jobs
> consumes it, and publish the same result (we publish the result to
> other kafka topic, so I actually see the massage duplicated on kafka
> ). how can it be ? both jobs are using the same group id (group id is
> fixed and not generated )
>
> Kind regards
> Avi


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: getting duplicate messages from duplicate jobs

Tzu-Li (Gordon) Tai
Hi,

Yes, Dawid is correct.

The "group.id" setting in Flink's Kafka Consumer is only used for group offset fetching and committing offsets back to Kafka (only for exposure purposes, not used for processing guarantees).
The Flink Kafka Consumer uses static partition assignment on the KafkaConsumer API, and not consumer group-based automatic partition assignments.

Cheers,
Gordon

On Sun, Jan 27, 2019 at 12:28 AM Dawid Wysakowicz <[hidden email]> wrote:
Forgot to cc Gordon :)

On 23/01/2019 18:02, Avi Levi wrote:
> Hi, 
> This quite confusing. 
> I submitted the same stateless job twice (actually I upload it once).
> However when I place a message on kafka, it seems that both jobs
> consumes it, and publish the same result (we publish the result to
> other kafka topic, so I actually see the massage duplicated on kafka
> ). how can it be ? both jobs are using the same group id (group id is
> fixed and not generated )
>
> Kind regards
> Avi

Reply | Threaded
Open this post in threaded view
|

Re: getting duplicate messages from duplicate jobs

avilevi
Ok, if you guys think it's should be like that then so be it. All I am saying is that it is not standard behaviour from kafka consumer, at least according to the documentation . I understand that flink implements things differently and all I am saying that this is not according to kafka standard consumer group. 
 

On Tue, Jan 29, 2019 at 9:47 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

Yes, Dawid is correct.

The "group.id" setting in Flink's Kafka Consumer is only used for group offset fetching and committing offsets back to Kafka (only for exposure purposes, not used for processing guarantees).
The Flink Kafka Consumer uses static partition assignment on the KafkaConsumer API, and not consumer group-based automatic partition assignments.

Cheers,
Gordon

On Sun, Jan 27, 2019 at 12:28 AM Dawid Wysakowicz <[hidden email]> wrote:
Forgot to cc Gordon :)

On 23/01/2019 18:02, Avi Levi wrote:
> Hi, 
> This quite confusing. 
> I submitted the same stateless job twice (actually I upload it once).
> However when I place a message on kafka, it seems that both jobs
> consumes it, and publish the same result (we publish the result to
> other kafka topic, so I actually see the massage duplicated on kafka
> ). how can it be ? both jobs are using the same group id (group id is
> fixed and not generated )
>
> Kind regards
> Avi

Reply | Threaded
Open this post in threaded view
|

Re: getting duplicate messages from duplicate jobs

selvarajchennappan@gmail.com

On Wed, Jan 30, 2019 at 6:11 PM Avi Levi <[hidden email]> wrote:
Ok, if you guys think it's should be like that then so be it. All I am saying is that it is not standard behaviour from kafka consumer, at least according to the documentation . I understand that flink implements things differently and all I am saying that this is not according to kafka standard consumer group. 
 

On Tue, Jan 29, 2019 at 9:47 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

Yes, Dawid is correct.

The "group.id" setting in Flink's Kafka Consumer is only used for group offset fetching and committing offsets back to Kafka (only for exposure purposes, not used for processing guarantees).
The Flink Kafka Consumer uses static partition assignment on the KafkaConsumer API, and not consumer group-based automatic partition assignments.

Cheers,
Gordon

On Sun, Jan 27, 2019 at 12:28 AM Dawid Wysakowicz <[hidden email]> wrote:
Forgot to cc Gordon :)

On 23/01/2019 18:02, Avi Levi wrote:
> Hi, 
> This quite confusing. 
> I submitted the same stateless job twice (actually I upload it once).
> However when I place a message on kafka, it seems that both jobs
> consumes it, and publish the same result (we publish the result to
> other kafka topic, so I actually see the massage duplicated on kafka
> ). how can it be ? both jobs are using the same group id (group id is
> fixed and not generated )
>
> Kind regards
> Avi



--





Regards,
Selvaraj C