Hi All, I am bit confused on Flink kafka consumer working. I read that Flink stores the kafka message offset in checkpoint and uses it in case if it restarts. Question is when exactly Flink is committing about successful consumption confirmation to kafka broker? And when Flink job restarts will it send last offset which is available in checkpoint to kafka broker to start consuming from that point ? Or Kafka broker will resume based on last committed offset information available? (I mean who manages the actual offset here, Kafka broker or the Flink client) Thanks Mahendra |
Hi Mahendra, Flink will regularly create checkpoints or manually triggered savepoints. This is data managed and stored by Flink and that data also contains the kafka offsets. When restarting, you can configure to restart from the last checkpoint and or savepoint. You can additionally configure Flink to commit the offsets to kafka, again, on checkpoint only. You can then configure Flink to restart from the committed offset, if you don't let Flink restart from an existing checkpoint or savepoint, where it would first search in to retore the offsets. Having the offsets loaded either from checkpoint, savepoint or kafka, it will directly communicate with Kafka and ask kafka to poll messages starting from those offsets. Best regards Theo Von meinem Huawei-Telefon gesendet -------- Ursprüngliche Nachricht -------- Von: "Hegde, Mahendra" <[hidden email]> Datum: Mi., 12. Feb. 2020, 17:50 An: [hidden email] Betreff: How Flink Kafka Consumer works when it restarts Hi All, I am bit confused on Flink kafka consumer working. I read that Flink stores the kafka message offset in checkpoint and uses it in case if it restarts. Question is when exactly Flink is committing about successful consumption confirmation to kafka broker? And when Flink job restarts will it send last offset which is available in checkpoint to kafka broker to start consuming from that point ? Or Kafka broker will resume based on last committed offset information available? (I mean who manages the actual offset here, Kafka broker or the Flink client) Thanks Mahendra |
In reply to this post by Hegde, Mahendra
Thanks Theo ! From: "[hidden email]" <[hidden email]> Hi Mahendra,
Flink will regularly create checkpoints or manually triggered savepoints. This is data managed and stored by Flink and that data also contains the kafka offsets. When restarting, you can configure to restart from the last checkpoint and or savepoint. You can additionally configure Flink to commit the offsets to kafka, again, on checkpoint only. You can then configure Flink to restart from the committed offset, if you don't let Flink restart
from an existing checkpoint or savepoint, where it would first search in to retore the offsets. Having the offsets loaded either from checkpoint, savepoint or kafka, it will directly communicate with Kafka and ask kafka to poll messages starting from those offsets. Best regards Theo
Hi All, I am bit confused on Flink kafka consumer working. I read that Flink stores the kafka message offset in checkpoint and uses it in case if it restarts. Question is when exactly Flink is committing about successful consumption confirmation to kafka broker? And when Flink job restarts will it send last offset which is available in checkpoint to kafka broker to start consuming from that point ? Or Kafka broker will resume based on last committed offset information available? (I mean who manages the actual offset here, Kafka broker or the Flink client) Thanks Mahendra |
What are the pros and cons of Kafka offset keeping vs Flink offset keeping? Is one more reliable than the other? Personally I prefer having flink manage it due to it being intrinsically tied to its checkpointing mechanism. But interested to learn from others experiences. Thanks Tim On Thu, Feb 13, 2020, 12:39 AM Hegde, Mahendra <[hidden email]> wrote:
|
The main benefit of letting Flink keep the offsets is that you get exactly once semantics (with the offsets in Flink state, it is aligned with all your other state). When storing the offsets in Kafka, you get at least once semantics (= you are seeing some messages twice on restore / when continuing) On Thu, Feb 13, 2020 at 2:56 PM Timothy Victor <[hidden email]> wrote:
|
I actually like to have both. As Robert states, storing it in state makes is more intrinsic and allows for exacly once processing. Having them in Kafka is especially good for monitoring. In our monitoring setup, its easy to visualize the committed offsets of kafka consumer groups whereas it is much harder to read data from flink-state. So that's a good way to track the maximum lag (committed!). Also, if your usecase allows (Which some of ours do), its easy to deploy new versions of your pipeline where you just delete the existing state and don't need migration but still start not in the "maxium" past (like 7 days ago) but from last time committed. But that, of course, highly depends on the use case and does e.g. not work with exactly once semantics. Best regards Theo Von: "Robert Metzger" <[hidden email]> An: "Timothy Victor" <[hidden email]> CC: "Mahendra Hegde" <[hidden email]>, "Theo Diefenthal" <[hidden email]>, "user" <[hidden email]> Gesendet: Freitag, 14. Februar 2020 15:26:44 Betreff: Re: AW: How Flink Kafka Consumer works when it restarts The main benefit of letting Flink keep the offsets is that you get exactly once semantics (with the offsets in Flink state, it is aligned with all your other state). When storing the offsets in Kafka, you get at least once semantics (= you are seeing some messages twice on restore / when continuing) On Thu, Feb 13, 2020 at 2:56 PM Timothy Victor <[hidden email]> wrote:
-- SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln Theo Diefenthal T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 [hidden email] - www.scoop-software.de Sitz der Gesellschaft: Köln, Handelsregister: Köln, Handelsregisternummer: HRB 36625 Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen, Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel |
Free forum by Nabble | Edit this page |