Restoring from a checkpoint or savepoint on a different Kafka consumer group

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Restoring from a checkpoint or savepoint on a different Kafka consumer group

Rex Fenley
Hello,

When using the Kafka consumer connector, if we restore a from a checkpoint or savepoint using a differently named consumer group than the one we originally ran a job with will it still pick up exactly where it left off or are you locked into using the same consumer group as before?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Restoring from a checkpoint or savepoint on a different Kafka consumer group

yidan zhao
If you changed the consumer group in your new job, the group id will be the new one you set.
The job will continue to consumer the topics from the savepoint/checkpoint you specified no matter whether the group id is the original one?

Rex Fenley <[hidden email]> 于2021年1月18日周一 下午12:53写道:
Hello,

When using the Kafka consumer connector, if we restore a from a checkpoint or savepoint using a differently named consumer group than the one we originally ran a job with will it still pick up exactly where it left off or are you locked into using the same consumer group as before?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Restoring from a checkpoint or savepoint on a different Kafka consumer group

Piotr Nowojski-4

pon., 18 sty 2021 o 09:00 赵一旦 <[hidden email]> napisał(a):
If you changed the consumer group in your new job, the group id will be the new one you set.
The job will continue to consumer the topics from the savepoint/checkpoint you specified no matter whether the group id is the original one?

Rex Fenley <[hidden email]> 于2021年1月18日周一 下午12:53写道:
Hello,

When using the Kafka consumer connector, if we restore a from a checkpoint or savepoint using a differently named consumer group than the one we originally ran a job with will it still pick up exactly where it left off or are you locked into using the same consumer group as before?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Restoring from a checkpoint or savepoint on a different Kafka consumer group

Rex Fenley
Thank you,

Some parts that stick out
>The Flink Kafka Consumer allows configuring the behaviour of how offsets are committed back to Kafka brokers. Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.

I read this as, "The offsets committed to Kafka are ignored, the offsets committed within a checkpoint are used".

>With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint.

This seems to say something similar.

So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off).

Is this interpretation correct?

Thanks!


On Mon, Jan 18, 2021 at 3:23 AM Piotr Nowojski <[hidden email]> wrote:

pon., 18 sty 2021 o 09:00 赵一旦 <[hidden email]> napisał(a):
If you changed the consumer group in your new job, the group id will be the new one you set.
The job will continue to consumer the topics from the savepoint/checkpoint you specified no matter whether the group id is the original one?

Rex Fenley <[hidden email]> 于2021年1月18日周一 下午12:53写道:
Hello,

When using the Kafka consumer connector, if we restore a from a checkpoint or savepoint using a differently named consumer group than the one we originally ran a job with will it still pick up exactly where it left off or are you locked into using the same consumer group as before?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Restoring from a checkpoint or savepoint on a different Kafka consumer group

Piotr Nowojski-4
Hi,

> I read this as, "The offsets committed to Kafka are ignored, the offsets committed within a checkpoint are used".

yes, exactly

> So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off).

Yes. But, keep in mind this part:

> setStartFromGroupOffsets (default behaviour): Start reading partitions from the consumer group’s (group.id setting in the consumer properties) committed offsets in Kafka brokers. If offsets could not be found for a partition, the auto.offset.reset setting in the properties will be used.

As I understand it, if you are using the default `setStartFromGroupOffsets`, and you happen to change `group.id` (which is what I believe you were asking about in the first e-mail), after changing the `group.id` FlinkKafkaConsumer will not be able to found previously saved offsets in the Flink's state and it will start reading from completely new set of offsets. The same way as if this would be a freshly started new job without any state. Those new offsets would be as specified/defined via `auto.offset.reset`.

Piotrek


pon., 18 sty 2021 o 18:12 Rex Fenley <[hidden email]> napisał(a):
Thank you,

Some parts that stick out
>The Flink Kafka Consumer allows configuring the behaviour of how offsets are committed back to Kafka brokers. Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.

I read this as, "The offsets committed to Kafka are ignored, the offsets committed within a checkpoint are used".

>With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint.

This seems to say something similar.

So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off).

Is this interpretation correct?

Thanks!


On Mon, Jan 18, 2021 at 3:23 AM Piotr Nowojski <[hidden email]> wrote:

pon., 18 sty 2021 o 09:00 赵一旦 <[hidden email]> napisał(a):
If you changed the consumer group in your new job, the group id will be the new one you set.
The job will continue to consumer the topics from the savepoint/checkpoint you specified no matter whether the group id is the original one?

Rex Fenley <[hidden email]> 于2021年1月18日周一 下午12:53写道:
Hello,

When using the Kafka consumer connector, if we restore a from a checkpoint or savepoint using a differently named consumer group than the one we originally ran a job with will it still pick up exactly where it left off or are you locked into using the same consumer group as before?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Restoring from a checkpoint or savepoint on a different Kafka consumer group

Rex Fenley
Thank you,

That's unfortunate, because I imagine we often will want to duplicate a job in order to do some testing out-of-bound from the normal job while slightly tweaking / tuning things. Is there any way to transfer offsets between consumer groups?

On Tue, Jan 19, 2021 at 5:45 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

> I read this as, "The offsets committed to Kafka are ignored, the offsets committed within a checkpoint are used".

yes, exactly

> So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off).

Yes. But, keep in mind this part:

> setStartFromGroupOffsets (default behaviour): Start reading partitions from the consumer group’s (group.id setting in the consumer properties) committed offsets in Kafka brokers. If offsets could not be found for a partition, the auto.offset.reset setting in the properties will be used.

As I understand it, if you are using the default `setStartFromGroupOffsets`, and you happen to change `group.id` (which is what I believe you were asking about in the first e-mail), after changing the `group.id` FlinkKafkaConsumer will not be able to found previously saved offsets in the Flink's state and it will start reading from completely new set of offsets. The same way as if this would be a freshly started new job without any state. Those new offsets would be as specified/defined via `auto.offset.reset`.

Piotrek


pon., 18 sty 2021 o 18:12 Rex Fenley <[hidden email]> napisał(a):
Thank you,

Some parts that stick out
>The Flink Kafka Consumer allows configuring the behaviour of how offsets are committed back to Kafka brokers. Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.

I read this as, "The offsets committed to Kafka are ignored, the offsets committed within a checkpoint are used".

>With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint.

This seems to say something similar.

So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off).

Is this interpretation correct?

Thanks!


On Mon, Jan 18, 2021 at 3:23 AM Piotr Nowojski <[hidden email]> wrote:

pon., 18 sty 2021 o 09:00 赵一旦 <[hidden email]> napisał(a):
If you changed the consumer group in your new job, the group id will be the new one you set.
The job will continue to consumer the topics from the savepoint/checkpoint you specified no matter whether the group id is the original one?

Rex Fenley <[hidden email]> 于2021年1月18日周一 下午12:53写道:
Hello,

When using the Kafka consumer connector, if we restore a from a checkpoint or savepoint using a differently named consumer group than the one we originally ran a job with will it still pick up exactly where it left off or are you locked into using the same consumer group as before?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Restoring from a checkpoint or savepoint on a different Kafka consumer group

Piotr Nowojski-5
Hi Rex,

Sorry, I might have misled you. I think you were right in your previous email

> So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off).
>
> Is this interpretation correct?

I think this is correct. `setStartFromGroupOffsets` and other `setStart*` variants take effect only if there are no offsets stored in the state. I would suggest you try it out regardless.

If you want to duplicate a job for some testing, each of the duplicated jobs will have it's own sets of offsets and they will read records independently, but starting from the same starting point (when the job was duplicated). 

Piotrek

wt., 19 sty 2021 o 20:19 Rex Fenley <[hidden email]> napisał(a):
Thank you,

That's unfortunate, because I imagine we often will want to duplicate a job in order to do some testing out-of-bound from the normal job while slightly tweaking / tuning things. Is there any way to transfer offsets between consumer groups?

On Tue, Jan 19, 2021 at 5:45 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

> I read this as, "The offsets committed to Kafka are ignored, the offsets committed within a checkpoint are used".

yes, exactly

> So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off).

Yes. But, keep in mind this part:

> setStartFromGroupOffsets (default behaviour): Start reading partitions from the consumer group’s (group.id setting in the consumer properties) committed offsets in Kafka brokers. If offsets could not be found for a partition, the auto.offset.reset setting in the properties will be used.

As I understand it, if you are using the default `setStartFromGroupOffsets`, and you happen to change `group.id` (which is what I believe you were asking about in the first e-mail), after changing the `group.id` FlinkKafkaConsumer will not be able to found previously saved offsets in the Flink's state and it will start reading from completely new set of offsets. The same way as if this would be a freshly started new job without any state. Those new offsets would be as specified/defined via `auto.offset.reset`.

Piotrek


pon., 18 sty 2021 o 18:12 Rex Fenley <[hidden email]> napisał(a):
Thank you,

Some parts that stick out
>The Flink Kafka Consumer allows configuring the behaviour of how offsets are committed back to Kafka brokers. Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.

I read this as, "The offsets committed to Kafka are ignored, the offsets committed within a checkpoint are used".

>With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint.

This seems to say something similar.

So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off).

Is this interpretation correct?

Thanks!


On Mon, Jan 18, 2021 at 3:23 AM Piotr Nowojski <[hidden email]> wrote:

pon., 18 sty 2021 o 09:00 赵一旦 <[hidden email]> napisał(a):
If you changed the consumer group in your new job, the group id will be the new one you set.
The job will continue to consumer the topics from the savepoint/checkpoint you specified no matter whether the group id is the original one?

Rex Fenley <[hidden email]> 于2021年1月18日周一 下午12:53写道:
Hello,

When using the Kafka consumer connector, if we restore a from a checkpoint or savepoint using a differently named consumer group than the one we originally ran a job with will it still pick up exactly where it left off or are you locked into using the same consumer group as before?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Restoring from a checkpoint or savepoint on a different Kafka consumer group

Rex Fenley
Thanks!

On Tue, Jan 19, 2021 at 9:47 PM Piotr Nowojski <[hidden email]> wrote:
Hi Rex,

Sorry, I might have misled you. I think you were right in your previous email

> So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off).
>
> Is this interpretation correct?

I think this is correct. `setStartFromGroupOffsets` and other `setStart*` variants take effect only if there are no offsets stored in the state. I would suggest you try it out regardless.

If you want to duplicate a job for some testing, each of the duplicated jobs will have it's own sets of offsets and they will read records independently, but starting from the same starting point (when the job was duplicated). 

Piotrek

wt., 19 sty 2021 o 20:19 Rex Fenley <[hidden email]> napisał(a):
Thank you,

That's unfortunate, because I imagine we often will want to duplicate a job in order to do some testing out-of-bound from the normal job while slightly tweaking / tuning things. Is there any way to transfer offsets between consumer groups?

On Tue, Jan 19, 2021 at 5:45 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

> I read this as, "The offsets committed to Kafka are ignored, the offsets committed within a checkpoint are used".

yes, exactly

> So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off).

Yes. But, keep in mind this part:

> setStartFromGroupOffsets (default behaviour): Start reading partitions from the consumer group’s (group.id setting in the consumer properties) committed offsets in Kafka brokers. If offsets could not be found for a partition, the auto.offset.reset setting in the properties will be used.

As I understand it, if you are using the default `setStartFromGroupOffsets`, and you happen to change `group.id` (which is what I believe you were asking about in the first e-mail), after changing the `group.id` FlinkKafkaConsumer will not be able to found previously saved offsets in the Flink's state and it will start reading from completely new set of offsets. The same way as if this would be a freshly started new job without any state. Those new offsets would be as specified/defined via `auto.offset.reset`.

Piotrek


pon., 18 sty 2021 o 18:12 Rex Fenley <[hidden email]> napisał(a):
Thank you,

Some parts that stick out
>The Flink Kafka Consumer allows configuring the behaviour of how offsets are committed back to Kafka brokers. Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.

I read this as, "The offsets committed to Kafka are ignored, the offsets committed within a checkpoint are used".

>With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint.

This seems to say something similar.

So from the sounds of things, regardless of the consumer group's offsets, it will always start from a checkpoint or savepoints offsets if there are some (unless checkpointing offsets is turned off).

Is this interpretation correct?

Thanks!


On Mon, Jan 18, 2021 at 3:23 AM Piotr Nowojski <[hidden email]> wrote:

pon., 18 sty 2021 o 09:00 赵一旦 <[hidden email]> napisał(a):
If you changed the consumer group in your new job, the group id will be the new one you set.
The job will continue to consumer the topics from the savepoint/checkpoint you specified no matter whether the group id is the original one?

Rex Fenley <[hidden email]> 于2021年1月18日周一 下午12:53写道:
Hello,

When using the Kafka consumer connector, if we restore a from a checkpoint or savepoint using a differently named consumer group than the one we originally ran a job with will it still pick up exactly where it left off or are you locked into using the same consumer group as before?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US