Howdy,
We're in the process of upgrading to 1.8. When restoring state to the new cluster (using a savepoint) we are seeing our Kafka consumers restart from the earliest offset. We're not receiving any other indication that our state was not accepted as part of the deploy, e.g. we are not allowing unrestored state, not receiving any errors. We have our consumers setup with the same consumer group and using the same consumer (FlinkKafkaConsumer010) as our 1.4 deploy. Has anyone encountered this? Any idea what we might be doing wrong? What's also strange is that we are not setting auto.offset.reset, which defaults to is largest (analogous to latest, correct?) -- which is not what we're seeing happen. Regards, Nik |
Hi Nik,
Could you check outt the taskmanagers’ logs? When restored from a savepoint/checkpoint, FlinkKafkaConsumer would log the starting offset of Kafka partitions. WRT `auto.offset.rest` in Kafka configuration, it’s of a relatively low priority, and would only be used when there’s no restored state plus FlinkKafkaConsumer is set to `startFromGroupOffset`. Best, Paul Lam
|
I checked the logs thanks to Paul's suggestion. I see a couple interesting things. Restoring into 1.8 from a 1.4 savepoint, some TMs receive partial state (e.g. only a partition/offset pair or two per TM -- we have 8 partitions on this topic). I'm not sure if this is normal (e.g. maybe TMs only used to receive the state for which they care). I focused on one topic, and I noticed that for at least 1 partition there is no restored state. Regardless of there being some state, it appears that all consumers are starting from scratch. What's also weird is that again we start from earliest offsets. The partition/offset state that is "restored" looks healthy -- e.g. valid partitions and offsets. We use the default of setStartFromGroupOffsets as well as the default Kafka option for auto.offset.reset. I believe should cause it to read from latest in the absence of state, not earliest. We are using the same consumer group as the legacy 1.4 app that we are restoring from, and shutting off the 1.4 job before starting our new cluster up. I also received some of these errors: 2019-05-24 22:10:57,479 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No restore state for FlinkKafkaConsumer. 2019-05-24 22:10:59,511 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No restore state for FlinkKafkaConsumer. 2019-05-24 22:10:59,524 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No restore state for FlinkKafkaConsumer. Using a 1.8 savepoint and restoring to 1.8, I see that each TM receives all state for the cluster, e.g. each receives every partition/offset pair for the topic I'm digging into. I also see none of the errors above. There is no inrush of data -- it appears to be restoring from known offsets well. Is there some change in how state is managed or what state is stored between these versions that can cause this? I can post more of the logs if it is of help. Is there some intermediate version of Flink (1.5-1.7) that we'd be able to restore / create a savepoint from to ensure the continuity of our state in 1.8? Any other thoughts? Thanks again, Nik Davis Senior Software Engineer New Relic On Fri, May 24, 2019 at 12:26 AM Paul Lam <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |