Hi Tony,
Currently, the functionality that you described does not exist in the consumer. When a topic is deleted, as far as I know, the consumer would simply consider the partitions as unreachable and continue to try fetching records from them until they are up again.
I'm not entirely sure if a removed topic is distinguishable from a temporarily out-of-service partition due to Kafka brokers being down in the Kafka API, may need to take a look.
As for the "workaround" that you are using at the moment, you can actually use `KeyedDeserializationSchema#isEndOfStream` for that. When that returns true and the source subtask closes, the Long.MAX_VALUE watermark will be emitted.
Cheers,
Gordon