How Flink Kafka Consumer works when it restarts

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How Flink Kafka Consumer works when it restarts

Hegde, Mahendra

Hi All,

 

I am bit confused on Flink kafka consumer working.

I read that Flink stores the kafka message offset in checkpoint and uses it in case if it restarts.

 

Question is when exactly Flink is committing about successful consumption confirmation to kafka broker?

And when Flink job restarts will it send last offset which is available in checkpoint to kafka broker to start consuming from that point ?

Or Kafka broker will resume based on last committed offset information available?

(I mean who manages the actual offset here, Kafka broker or the Flink client)

 

Thanks

Mahendra

Reply | Threaded
Open this post in threaded view
|

AW: How Flink Kafka Consumer works when it restarts

Theo
Hi Mahendra, 

Flink will regularly create checkpoints or manually triggered savepoints. This is data managed and stored by Flink and that data also contains the kafka offsets. 

When restarting, you can configure to restart from the last checkpoint and or savepoint.

You can additionally configure Flink to commit the offsets to kafka, again, on checkpoint only. You can then configure Flink to restart from the committed offset, if you don't let Flink restart from an existing checkpoint or savepoint, where it would first search in to retore the offsets. 

Having the offsets loaded either from checkpoint, savepoint or kafka, it will directly communicate with Kafka and ask kafka to poll messages starting from those offsets. 

Best regards
Theo


Von meinem Huawei-Telefon gesendet


-------- Ursprüngliche Nachricht --------
Von: "Hegde, Mahendra" <[hidden email]>
Datum: Mi., 12. Feb. 2020, 17:50
An: [hidden email]
Betreff: How Flink Kafka Consumer works when it restarts

Hi All,

 

I am bit confused on Flink kafka consumer working.

I read that Flink stores the kafka message offset in checkpoint and uses it in case if it restarts.

 

Question is when exactly Flink is committing about successful consumption confirmation to kafka broker?

And when Flink job restarts will it send last offset which is available in checkpoint to kafka broker to start consuming from that point ?

Or Kafka broker will resume based on last committed offset information available?

(I mean who manages the actual offset here, Kafka broker or the Flink client)

 

Thanks

Mahendra

Reply | Threaded
Open this post in threaded view
|

Re: AW: How Flink Kafka Consumer works when it restarts

Hegde, Mahendra
In reply to this post by Hegde, Mahendra

Thanks Theo !

 

From: "[hidden email]" <[hidden email]>
Date: Thursday, 13 February 2020 at 12:13 AM
To: "Hegde, Mahendra" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: [External] AW: How Flink Kafka Consumer works when it restarts

 

Hi Mahendra, 

 

Flink will regularly create checkpoints or manually triggered savepoints. This is data managed and stored by Flink and that data also contains the kafka offsets. 

 

When restarting, you can configure to restart from the last checkpoint and or savepoint.

 

You can additionally configure Flink to commit the offsets to kafka, again, on checkpoint only. You can then configure Flink to restart from the committed offset, if you don't let Flink restart from an existing checkpoint or savepoint, where it would first search in to retore the offsets. 

 

Having the offsets loaded either from checkpoint, savepoint or kafka, it will directly communicate with Kafka and ask kafka to poll messages starting from those offsets. 

 

Best regards

Theo

 


Von meinem Huawei-Telefon gesendet



-------- Ursprüngliche Nachricht --------
Von: "Hegde, Mahendra" <[hidden email]>
Datum: Mi., 12. Feb. 2020, 17:50
An: [hidden email]
Betreff: How Flink Kafka Consumer works when it restarts

Hi All,

 

I am bit confused on Flink kafka consumer working.

I read that Flink stores the kafka message offset in checkpoint and uses it in case if it restarts.

 

Question is when exactly Flink is committing about successful consumption confirmation to kafka broker?

And when Flink job restarts will it send last offset which is available in checkpoint to kafka broker to start consuming from that point ?

Or Kafka broker will resume based on last committed offset information available?

(I mean who manages the actual offset here, Kafka broker or the Flink client)

 

Thanks

Mahendra

Reply | Threaded
Open this post in threaded view
|

Re: AW: How Flink Kafka Consumer works when it restarts

Timothy Victor
What are the pros and cons of Kafka offset keeping vs Flink offset keeping?  Is one more reliable than the other?   Personally I prefer having flink manage it due to it being intrinsically tied to its checkpointing mechanism.  But interested to learn from others experiences.

Thanks

Tim

On Thu, Feb 13, 2020, 12:39 AM Hegde, Mahendra <[hidden email]> wrote:

Thanks Theo !

 

From: "[hidden email]" <[hidden email]>
Date: Thursday, 13 February 2020 at 12:13 AM
To: "Hegde, Mahendra" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: [External] AW: How Flink Kafka Consumer works when it restarts

 

Hi Mahendra, 

 

Flink will regularly create checkpoints or manually triggered savepoints. This is data managed and stored by Flink and that data also contains the kafka offsets. 

 

When restarting, you can configure to restart from the last checkpoint and or savepoint.

 

You can additionally configure Flink to commit the offsets to kafka, again, on checkpoint only. You can then configure Flink to restart from the committed offset, if you don't let Flink restart from an existing checkpoint or savepoint, where it would first search in to retore the offsets. 

 

Having the offsets loaded either from checkpoint, savepoint or kafka, it will directly communicate with Kafka and ask kafka to poll messages starting from those offsets. 

 

Best regards

Theo

 


Von meinem Huawei-Telefon gesendet



-------- Ursprüngliche Nachricht --------
Von: "Hegde, Mahendra" <[hidden email]>
Datum: Mi., 12. Feb. 2020, 17:50
An: [hidden email]
Betreff: How Flink Kafka Consumer works when it restarts

Hi All,

 

I am bit confused on Flink kafka consumer working.

I read that Flink stores the kafka message offset in checkpoint and uses it in case if it restarts.

 

Question is when exactly Flink is committing about successful consumption confirmation to kafka broker?

And when Flink job restarts will it send last offset which is available in checkpoint to kafka broker to start consuming from that point ?

Or Kafka broker will resume based on last committed offset information available?

(I mean who manages the actual offset here, Kafka broker or the Flink client)

 

Thanks

Mahendra

Reply | Threaded
Open this post in threaded view
|

Re: AW: How Flink Kafka Consumer works when it restarts

rmetzger0
The main benefit of letting Flink keep the offsets is that you get exactly once semantics (with the offsets in Flink state, it is aligned with all your other state).
When storing the offsets in Kafka, you get at least once semantics (= you are seeing some messages twice on restore / when continuing)

On Thu, Feb 13, 2020 at 2:56 PM Timothy Victor <[hidden email]> wrote:
What are the pros and cons of Kafka offset keeping vs Flink offset keeping?  Is one more reliable than the other?   Personally I prefer having flink manage it due to it being intrinsically tied to its checkpointing mechanism.  But interested to learn from others experiences.

Thanks

Tim

On Thu, Feb 13, 2020, 12:39 AM Hegde, Mahendra <[hidden email]> wrote:

Thanks Theo !

 

From: "[hidden email]" <[hidden email]>
Date: Thursday, 13 February 2020 at 12:13 AM
To: "Hegde, Mahendra" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: [External] AW: How Flink Kafka Consumer works when it restarts

 

Hi Mahendra, 

 

Flink will regularly create checkpoints or manually triggered savepoints. This is data managed and stored by Flink and that data also contains the kafka offsets. 

 

When restarting, you can configure to restart from the last checkpoint and or savepoint.

 

You can additionally configure Flink to commit the offsets to kafka, again, on checkpoint only. You can then configure Flink to restart from the committed offset, if you don't let Flink restart from an existing checkpoint or savepoint, where it would first search in to retore the offsets. 

 

Having the offsets loaded either from checkpoint, savepoint or kafka, it will directly communicate with Kafka and ask kafka to poll messages starting from those offsets. 

 

Best regards

Theo

 


Von meinem Huawei-Telefon gesendet



-------- Ursprüngliche Nachricht --------
Von: "Hegde, Mahendra" <[hidden email]>
Datum: Mi., 12. Feb. 2020, 17:50
An: [hidden email]
Betreff: How Flink Kafka Consumer works when it restarts

Hi All,

 

I am bit confused on Flink kafka consumer working.

I read that Flink stores the kafka message offset in checkpoint and uses it in case if it restarts.

 

Question is when exactly Flink is committing about successful consumption confirmation to kafka broker?

And when Flink job restarts will it send last offset which is available in checkpoint to kafka broker to start consuming from that point ?

Or Kafka broker will resume based on last committed offset information available?

(I mean who manages the actual offset here, Kafka broker or the Flink client)

 

Thanks

Mahendra

Reply | Threaded
Open this post in threaded view
|

Re: How Flink Kafka Consumer works when it restarts

Theo
I actually like to have both.

As Robert states, storing it in state makes is more intrinsic and allows for exacly once processing.

Having them in Kafka is especially good for monitoring. In our monitoring setup, its easy to visualize the committed offsets of kafka consumer groups whereas it is much harder to read data from flink-state. So that's a good way to track the maximum lag (committed!).

Also, if your usecase allows (Which some of ours do), its easy to deploy new versions of your pipeline where you just delete the existing state and don't need migration but still start not in the "maxium" past (like 7 days ago) but from last time committed. But that, of course, highly depends on the use case and does e.g. not work with exactly once semantics.

Best regards
Theo


Von: "Robert Metzger" <[hidden email]>
An: "Timothy Victor" <[hidden email]>
CC: "Mahendra Hegde" <[hidden email]>, "Theo Diefenthal" <[hidden email]>, "user" <[hidden email]>
Gesendet: Freitag, 14. Februar 2020 15:26:44
Betreff: Re: AW: How Flink Kafka Consumer works when it restarts

The main benefit of letting Flink keep the offsets is that you get exactly once semantics (with the offsets in Flink state, it is aligned with all your other state).
When storing the offsets in Kafka, you get at least once semantics (= you are seeing some messages twice on restore / when continuing)

On Thu, Feb 13, 2020 at 2:56 PM Timothy Victor <[hidden email]> wrote:
What are the pros and cons of Kafka offset keeping vs Flink offset keeping?  Is one more reliable than the other?   Personally I prefer having flink manage it due to it being intrinsically tied to its checkpointing mechanism.  But interested to learn from others experiences.

Thanks

Tim

On Thu, Feb 13, 2020, 12:39 AM Hegde, Mahendra <[hidden email]> wrote:

Thanks Theo !

 

From: "[hidden email]" <[hidden email]>
Date: Thursday, 13 February 2020 at 12:13 AM
To: "Hegde, Mahendra" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: [External] AW: How Flink Kafka Consumer works when it restarts

 

Hi Mahendra, 

 

Flink will regularly create checkpoints or manually triggered savepoints. This is data managed and stored by Flink and that data also contains the kafka offsets. 

 

When restarting, you can configure to restart from the last checkpoint and or savepoint.

 

You can additionally configure Flink to commit the offsets to kafka, again, on checkpoint only. You can then configure Flink to restart from the committed offset, if you don't let Flink restart from an existing checkpoint or savepoint, where it would first search in to retore the offsets. 

 

Having the offsets loaded either from checkpoint, savepoint or kafka, it will directly communicate with Kafka and ask kafka to poll messages starting from those offsets. 

 

Best regards

Theo

 


Von meinem Huawei-Telefon gesendet



-------- Ursprüngliche Nachricht --------
Von: "Hegde, Mahendra" <[hidden email]>
Datum: Mi., 12. Feb. 2020, 17:50
An: [hidden email]
Betreff: How Flink Kafka Consumer works when it restarts

Hi All,

 

I am bit confused on Flink kafka consumer working.

I read that Flink stores the kafka message offset in checkpoint and uses it in case if it restarts.

 

Question is when exactly Flink is committing about successful consumption confirmation to kafka broker?

And when Flink job restarts will it send last offset which is available in checkpoint to kafka broker to start consuming from that point ?

Or Kafka broker will resume based on last committed offset information available?

(I mean who manages the actual offset here, Kafka broker or the Flink client)

 

Thanks

Mahendra



--
SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln
Theo Diefenthal

T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
[hidden email] - www.scoop-software.de
Sitz der Gesellschaft: Köln, Handelsregister: Köln,
Handelsregisternummer: HRB 36625
Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel