Using Kafka 0.10.x timestamps as a record value in Flink Streaming
Posted by
Jia Teoh on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Using-Kafka-0-10-x-timestamps-as-a-record-value-in-Flink-Streaming-tp13119.html
Hi,
Is there a way to retrieve the timestamps that Kafka associates with each key-value pair within Flink? I would like to be able to use these as values within my application flow, and defining them before or after Kafka is not acceptable for the use case due to the latency involved in sending or receiving from Kafka.
It seems that Flink supports Kafka event time (
link) but after a brief trace it seems that KafkaConsumer010 still relies on the
Kafka09Fetcher for iterating through each Kafka record and deserializing it. The KeyedDeserializationSchema api does not seem to have support for including timestamp as additional metadata (just offset, topic, and partition) so something such as JSONKeyValueDeserializationSch
ema will not return the Kafka-specified timestamp.
For reference, I am using Kafka 0.10.2 and the Flink-Streaming API + Kafka Connector (1.2.1).
Thanks,
Jia Teoh