Hi Hayden,
as far as I know, an end offset is not supported by Flink's Kafka consumer.
You could extend Flink's consumer. As you said, there is already code to set the starting offset (per partition), so you might be able to just piggyback on that.
Gordon (in CC) who has worked a lot on the Kafka connector might have a better idea.
Best, Fabian