How to advance Kafka offsets manually with checkpointing enabled on Flink (TableAPI)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to advance Kafka offsets manually with checkpointing enabled on Flink (TableAPI)

Rex Fenley
Hello,

I'm wondering how, in the event of a poison pill record on Kafka, to advance a partition's checkpointed offsets by 1 when using the TableAPI/SQL.

It is my understanding that when checkpointing is enabled Flink uses its own checkpoint committed offsets and not the offsets committed to Kafka when starting a job from a checkpoint.

In the event that there is a poison pill record in Kafka that is crashing the Flink job, we may want to simply advance our checkpointed offsets by 1 for the partition, past the poison record, and then continue operation as normal. We do not want to lose any other state in Flink however.

I'm wondering how to go about this then. It's easy enough to have Kafka advance its committed offsets. Is there a way to tell Flink to ignore checkpointed offsets and instead respect the offsets committed to Kafka for a consumer group when restoring from a checkpoint?
If so we could:
1. Advance Kafka's offsets.
2. Run our job from the checkpoint and have it use Kafka's offsets and then checkpoint with new Kafka offsets.
3. Stop the job, and rerun it using Flink's committed, now advanced, offsets.

Is this possible? Are there any better strategies?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: How to advance Kafka offsets manually with checkpointing enabled on Flink (TableAPI)

Dawid Wysakowicz-2

Hi Rex,

The approach you described is definitely possible in the DataStream API. You could replace the uid of your Kafka source and start your job with your checkpoint with the allowNonRestoredState option enabled[1]. I am afraid though it is not possible to change the uid in Table API/SQL

Another approach that you could try is to edit the checkpoint via the State Processor API[2] and increase the checkpointed offsets.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#starting-a-job-from-a-savepoint

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html

On 16/03/2021 20:03, Rex Fenley wrote:
Hello,

I'm wondering how, in the event of a poison pill record on Kafka, to advance a partition's checkpointed offsets by 1 when using the TableAPI/SQL.

It is my understanding that when checkpointing is enabled Flink uses its own checkpoint committed offsets and not the offsets committed to Kafka when starting a job from a checkpoint.

In the event that there is a poison pill record in Kafka that is crashing the Flink job, we may want to simply advance our checkpointed offsets by 1 for the partition, past the poison record, and then continue operation as normal. We do not want to lose any other state in Flink however.

I'm wondering how to go about this then. It's easy enough to have Kafka advance its committed offsets. Is there a way to tell Flink to ignore checkpointed offsets and instead respect the offsets committed to Kafka for a consumer group when restoring from a checkpoint?
If so we could:
1. Advance Kafka's offsets.
2. Run our job from the checkpoint and have it use Kafka's offsets and then checkpoint with new Kafka offsets.
3. Stop the job, and rerun it using Flink's committed, now advanced, offsets.

Is this possible? Are there any better strategies?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


OpenPGP_signature (855 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to advance Kafka offsets manually with checkpointing enabled on Flink (TableAPI)

Rex Fenley
Thanks for the info!

On Thu, Mar 18, 2021 at 7:46 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Rex,

The approach you described is definitely possible in the DataStream API. You could replace the uid of your Kafka source and start your job with your checkpoint with the allowNonRestoredState option enabled[1]. I am afraid though it is not possible to change the uid in Table API/SQL

Another approach that you could try is to edit the checkpoint via the State Processor API[2] and increase the checkpointed offsets.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#starting-a-job-from-a-savepoint

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html

On 16/03/2021 20:03, Rex Fenley wrote:
Hello,

I'm wondering how, in the event of a poison pill record on Kafka, to advance a partition's checkpointed offsets by 1 when using the TableAPI/SQL.

It is my understanding that when checkpointing is enabled Flink uses its own checkpoint committed offsets and not the offsets committed to Kafka when starting a job from a checkpoint.

In the event that there is a poison pill record in Kafka that is crashing the Flink job, we may want to simply advance our checkpointed offsets by 1 for the partition, past the poison record, and then continue operation as normal. We do not want to lose any other state in Flink however.

I'm wondering how to go about this then. It's easy enough to have Kafka advance its committed offsets. Is there a way to tell Flink to ignore checkpointed offsets and instead respect the offsets committed to Kafka for a consumer group when restoring from a checkpoint?
If so we could:
1. Advance Kafka's offsets.
2. Run our job from the checkpoint and have it use Kafka's offsets and then checkpoint with new Kafka offsets.
3. Stop the job, and rerun it using Flink's committed, now advanced, offsets.

Is this possible? Are there any better strategies?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US