We've got a relatively simply job that reads in from Kafka, and writes to S3. We've had a couple of job failures where the consumer lag had built up, but after the restart, the lag was wiped out because our offset positions were lost and we consumed from latest offset.
The job has checkpointing enabled:
```
val checkpointInterval = getProperty(checkpointIntervalPropertyKey).toInt
env.enableCheckpointing(checkpointInterval)
```
but we also have:
```
kafkaSource.setStartFromLatest()
```
set.
According to the documentation, the offsets stored in the checkpoint should over-ride the "StartFromLatest". When the job restarts, we can see it retrieving the checkpoint state but we also see a message about master state as follows:
```
[16-Feb-2019 18:58:54.524 UTC] INFO <CheckpointCoordinator> Restoring from latest valid checkpoint: Checkpoint 2155 @ 1550342786207 for 6bc7420e001f76ffec7d2501d5f504c0.
[16-Feb-2019 18:58:54.525 UTC] INFO <CheckpointCoordinator> No master state to restore
```
Then after the job gets into a RUNNING state, it continues to checkpoint:
```
[16-Feb-2019 18:59:00.528 UTC] INFO <CheckpointCoordinator> Triggering checkpoint 2157 @ 1550343540525
```
Between the "No master state to restore" message, and the fact we seem to be starting from latest, I'm wondering if we also need to explicitly set a group id or some other requirement to properly have offsets stored in checkpoints?
Regards,
Dave