Flink Kafka offsets

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Kafka offsets

Rex Fenley
Hello,

I've been trying to configure the offset start position for a flink kafka consumer. when there is no committed offset, to always start at the beginning. It seems like the typical way to do this would be setting auto.offset.reset=earliest however, I don't see that configuration property in the documentation. https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

However, I do see scan.startup.mode = earliest-offset, but from the docs it sounds like this would mean it would never commit an offset and flink would always start consuming from the beginning of the kafka stream, which is not what I want.

Is this the case or am I misunderstanding? How can I get the behavior that I wish to see, where committed offsets are respected, but no offset means start at the beginning of the kafka log stream?

Thanks!
--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka offsets

Dawid Wysakowicz-2

Hey Rex,

I agree the documentation might be slightly misleading. To get the full picture of that configuration I'd suggest having a look at the DataStream Kafka connector page[1]. The Table connector is just a wrapper around the DataStream one.

Let me also try to clarify it a bit more. In case of Flink there are two places where the offsets are committed:

1) Flink's checkpoint/savepoint. Those always take the highest priority. Therefore e.g. when the job is restarted because of a failure, it will use offsets that were stored in the last successful checkpoint.

2) Upon a checkpoint Flink can also write the offsets back to Kafka. This is enabled by default in DataStream API and is enabled in Table API if you provide properties.group.id[2]. This works only if you have checkpointing enabled. If you disable checkpoints, you can still auto commit offsets from the underlying Kafka consumer via properties.enable.auto.commit / properties.auto.commit.interval.ms (btw, you can pass any Kafka options with a properties.* prefix).

Having explained that, if you set scan.startup-mode and you do not restore from a checkpoint/savepoint:

* group-offsets -> it will start consuming from the committed offset in Kafka for the configured group.id, if there are none it should use properties.auto.offset.reset option

* earliest-offset -> it will ignore committed offsets in Kafka and start from earliest-offsets.

Hope it helps.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#properties-group-id

On 13/10/2020 07:43, Rex Fenley wrote:
Hello,

I've been trying to configure the offset start position for a flink kafka consumer. when there is no committed offset, to always start at the beginning. It seems like the typical way to do this would be setting auto.offset.reset=earliest however, I don't see that configuration property in the documentation. https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

However, I do see scan.startup.mode = earliest-offset, but from the docs it sounds like this would mean it would never commit an offset and flink would always start consuming from the beginning of the kafka stream, which is not what I want.

Is this the case or am I misunderstanding? How can I get the behavior that I wish to see, where committed offsets are respected, but no offset means start at the beginning of the kafka log stream?

Thanks!
--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka offsets

Rex Fenley
Thanks for the explanation, this was all super helpful.

On Tue, Oct 13, 2020 at 2:16 AM Dawid Wysakowicz <[hidden email]> wrote:

Hey Rex,

I agree the documentation might be slightly misleading. To get the full picture of that configuration I'd suggest having a look at the DataStream Kafka connector page[1]. The Table connector is just a wrapper around the DataStream one.

Let me also try to clarify it a bit more. In case of Flink there are two places where the offsets are committed:

1) Flink's checkpoint/savepoint. Those always take the highest priority. Therefore e.g. when the job is restarted because of a failure, it will use offsets that were stored in the last successful checkpoint.

2) Upon a checkpoint Flink can also write the offsets back to Kafka. This is enabled by default in DataStream API and is enabled in Table API if you provide properties.group.id[2]. This works only if you have checkpointing enabled. If you disable checkpoints, you can still auto commit offsets from the underlying Kafka consumer via properties.enable.auto.commit / properties.auto.commit.interval.ms (btw, you can pass any Kafka options with a properties.* prefix).

Having explained that, if you set scan.startup-mode and you do not restore from a checkpoint/savepoint:

* group-offsets -> it will start consuming from the committed offset in Kafka for the configured group.id, if there are none it should use properties.auto.offset.reset option

* earliest-offset -> it will ignore committed offsets in Kafka and start from earliest-offsets.

Hope it helps.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#properties-group-id

On 13/10/2020 07:43, Rex Fenley wrote:
Hello,

I've been trying to configure the offset start position for a flink kafka consumer. when there is no committed offset, to always start at the beginning. It seems like the typical way to do this would be setting auto.offset.reset=earliest however, I don't see that configuration property in the documentation. https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

However, I do see scan.startup.mode = earliest-offset, but from the docs it sounds like this would mean it would never commit an offset and flink would always start consuming from the beginning of the kafka stream, which is not what I want.

Is this the case or am I misunderstanding? How can I get the behavior that I wish to see, where committed offsets are respected, but no offset means start at the beginning of the kafka log stream?

Thanks!
--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US