Upgrading from 1.4 to 1.8, losing Kafka consumer state

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Upgrading from 1.4 to 1.8, losing Kafka consumer state

Nikolas Davis
Howdy,

We're in the process of upgrading to 1.8. When restoring state to the new cluster (using a savepoint) we are seeing our Kafka consumers restart from the earliest offset. We're not receiving any other indication that our state was not accepted as part of the deploy, e.g. we are not allowing unrestored state, not receiving any errors.

We have our consumers setup with the same consumer group and using the same consumer (FlinkKafkaConsumer010) as our 1.4 deploy.

Has anyone encountered this? Any idea what we might be doing wrong?

What's also strange is that we are not setting auto.offset.reset, which defaults to is largest (analogous to latest, correct?) -- which is not what we're seeing happen.

Regards,
 
Nik
Reply | Threaded
Open this post in threaded view
|

Re: Upgrading from 1.4 to 1.8, losing Kafka consumer state

Paul Lam
Hi Nik,

Could you check outt the taskmanagers’ logs? When restored from a savepoint/checkpoint, FlinkKafkaConsumer would log the starting offset of Kafka partitions.

WRT `auto.offset.rest` in Kafka configuration, it’s of a relatively low priority, and would only be used when there’s no restored state plus FlinkKafkaConsumer is set to `startFromGroupOffset`.

Best,
Paul Lam

在 2019年5月24日,07:50,Nikolas Davis <[hidden email]> 写道:

Howdy,

We're in the process of upgrading to 1.8. When restoring state to the new cluster (using a savepoint) we are seeing our Kafka consumers restart from the earliest offset. We're not receiving any other indication that our state was not accepted as part of the deploy, e.g. we are not allowing unrestored state, not receiving any errors.

We have our consumers setup with the same consumer group and using the same consumer (FlinkKafkaConsumer010) as our 1.4 deploy.

Has anyone encountered this? Any idea what we might be doing wrong?

What's also strange is that we are not setting auto.offset.reset, which defaults to is largest (analogous to latest, correct?) -- which is not what we're seeing happen.

Regards,
 
Nik

Reply | Threaded
Open this post in threaded view
|

Re: Upgrading from 1.4 to 1.8, losing Kafka consumer state

Nikolas Davis
I checked the logs thanks to Paul's suggestion. I see a couple interesting things. Restoring into 1.8 from a 1.4 savepoint, some TMs receive partial state (e.g. only a partition/offset pair or two per TM -- we have 8 partitions on this topic). I'm not sure if this is normal (e.g. maybe TMs only used to receive the state for which they care). I focused on one topic, and I noticed that for at least 1 partition there is no restored state. Regardless of there being some state, it appears that all consumers are starting from scratch. What's also weird is that again we start from earliest offsets. The partition/offset state that is "restored" looks healthy -- e.g. valid partitions and offsets. We use the default of setStartFromGroupOffsets as well as the default Kafka option for auto.offset.reset. I believe should cause it to read from latest in the absence of state, not earliest. We are using the same consumer group as the legacy 1.4 app that we are restoring from, and shutting off the 1.4 job before starting our new cluster up.

I also received some of these errors:

2019-05-24 22:10:57,479 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
2019-05-24 22:10:59,511 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
2019-05-24 22:10:59,524 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.

Using a 1.8 savepoint and restoring to 1.8, I see that each TM receives all state for the cluster, e.g. each receives every partition/offset pair for the topic I'm digging into. I also see none of the errors above. There is no inrush of data -- it appears to be restoring from known offsets well.

Is there some change in how state is managed or what state is stored between these versions that can cause this? I can post more of the logs if it is of help. Is there some intermediate version of Flink (1.5-1.7) that we'd be able to restore / create a savepoint from to ensure the continuity of our state in 1.8? Any other thoughts?

Thanks again,

Nik Davis
Senior Software Engineer
New Relic


On Fri, May 24, 2019 at 12:26 AM Paul Lam <[hidden email]> wrote:
Hi Nik,

Could you check outt the taskmanagers’ logs? When restored from a savepoint/checkpoint, FlinkKafkaConsumer would log the starting offset of Kafka partitions.

WRT `auto.offset.rest` in Kafka configuration, it’s of a relatively low priority, and would only be used when there’s no restored state plus FlinkKafkaConsumer is set to `startFromGroupOffset`.

Best,
Paul Lam

在 2019年5月24日,07:50,Nikolas Davis <[hidden email]> 写道:

Howdy,

We're in the process of upgrading to 1.8. When restoring state to the new cluster (using a savepoint) we are seeing our Kafka consumers restart from the earliest offset. We're not receiving any other indication that our state was not accepted as part of the deploy, e.g. we are not allowing unrestored state, not receiving any errors.

We have our consumers setup with the same consumer group and using the same consumer (FlinkKafkaConsumer010) as our 1.4 deploy.

Has anyone encountered this? Any idea what we might be doing wrong?

What's also strange is that we are not setting auto.offset.reset, which defaults to is largest (analogous to latest, correct?) -- which is not what we're seeing happen.

Regards,
 
Nik