Hi Sendoh,
the KeyedDeserializationSchema allows you to access the topic name, partition id and offset of the message.
So you need to implement a custom deserialization schema, extending the KeyedDeserializationSchema to get this information.
Regards,
Robert