Hello, When using the Kafka consumer connector, if we restore a from a checkpoint or savepoint using a differently named consumer group than the one we originally ran a job with will it still pick up exactly where it left off or are you locked into using the same consumer group as before? Thanks! -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
If you changed the consumer group in your new job, the group id will be the new one you set. The job will continue to consumer the topics from the savepoint/checkpoint you specified no matter whether the group id is the original one? Rex Fenley <[hidden email]> 于2021年1月18日周一 下午12:53写道:
|
Hi Rex, I believe this section answers your question [1] Piotrek pon., 18 sty 2021 o 09:00 赵一旦 <[hidden email]> napisał(a):
|
Thank you, Some parts that stick out >The Flink Kafka Consumer allows configuring the behaviour of how offsets
are committed back to Kafka brokers. Note that the
Flink Kafka Consumer does not rely on the committed offsets for fault
tolerance guarantees. The committed offsets are only a means to expose
the consumer’s progress for monitoring purposes. I read this as, "The offsets committed to Kafka are ignored, the offsets committed within a checkpoint are used". >With Flink’s checkpointing enabled, the Flink Kafka Consumer will
consume records from a topic and periodically checkpoint all
its Kafka offsets, together with the state of other operations. In case
of a job failure, Flink will restore
the streaming program to the state of the latest checkpoint and
re-consume the records from Kafka, starting from the offsets that were
stored in the checkpoint. This seems to say something similar. So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off). Is this interpretation correct? Thanks! On Mon, Jan 18, 2021 at 3:23 AM Piotr Nowojski <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi, > I read this as, "The offsets committed to Kafka are ignored, the offsets committed within a checkpoint are used". yes, exactly > So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off). Yes. But, keep in mind this part: > setStartFromGroupOffsets (default behaviour): Start reading partitions from the consumer group’s (group.id setting in the consumer properties) committed offsets in Kafka brokers. If offsets could not be found for a partition, the auto.offset.reset setting in the properties will be used. As I understand it, if you are using the default `setStartFromGroupOffsets`, and you happen to change `group.id` (which is what I believe you were asking about in the first e-mail), after changing the `group.id` FlinkKafkaConsumer will not be able to found previously saved offsets in the Flink's state and it will start reading from completely new set of offsets. The same way as if this would be a freshly started new job without any state. Those new offsets would be as specified/defined via `auto.offset.reset`. Piotrek pon., 18 sty 2021 o 18:12 Rex Fenley <[hidden email]> napisał(a):
|
Thank you, That's unfortunate, because I imagine we often will want to duplicate a job in order to do some testing out-of-bound from the normal job while slightly tweaking / tuning things. Is there any way to transfer offsets between consumer groups? On Tue, Jan 19, 2021 at 5:45 AM Piotr Nowojski <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi Rex, Sorry, I might have misled you. I think you were right in your previous email > So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off). > > Is this interpretation correct? I think this is correct. `setStartFromGroupOffsets` and other `setStart*` variants take effect only if there are no offsets stored in the state. I would suggest you try it out regardless. If you want to duplicate a job for some testing, each of the duplicated jobs will have it's own sets of offsets and they will read records independently, but starting from the same starting point (when the job was duplicated). Piotrek wt., 19 sty 2021 o 20:19 Rex Fenley <[hidden email]> napisał(a):
|
Thanks! On Tue, Jan 19, 2021 at 9:47 PM Piotr Nowojski <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Free forum by Nabble | Edit this page |