Timestamp from Kafka record and watermark generation

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Timestamp from Kafka record and watermark generation

Federico D'Ambrosio-2
Hello everyone,

I'm consuming from a Kafka topic, on which I'm writing with a FlinkKafkaProducer, with the timestamp relative flag set to true.

From what I gather from the documentation [1], Flink is aware of Kafka Record's timestamp and only the watermark should be set with an appropriate TimestampExtractor, still I'm failing to understand how to implement it in the right way.

I thought that it would be possible to use the already existent AscendingTimestampExtractor, overriding the extractTimestamp method, but it's marked final.
new FlinkKafkaConsumer010[Event](ingestion_topic, new JSONDeserializationSchema(), consumerConfig)
.setStartFromLatest()
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
def extractAscendingTimestamp(element: Event): Long = ???
})
Should I need to implement my own TimestampExtractor (with the appropriate getCurrentWatermark and extractTimestamp methods) ?

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010

Thank you,
Federico

Reply | Threaded
Open this post in threaded view
|

Re: Timestamp from Kafka record and watermark generation

Aljoscha Krettek
Hi,

This is a know issue: https://issues.apache.org/jira/browse/FLINK-8500. And yes, the workaround is to write an assigner from scratch but you can start by copying the code of AscendingTimestampExtractor.

Sorry for the inconvenience.

--
Aljoscha

On 22. Feb 2018, at 12:05, Federico D'Ambrosio <[hidden email]> wrote:

Hello everyone,

I'm consuming from a Kafka topic, on which I'm writing with a FlinkKafkaProducer, with the timestamp relative flag set to true.

From what I gather from the documentation [1], Flink is aware of Kafka Record's timestamp and only the watermark should be set with an appropriate TimestampExtractor, still I'm failing to understand how to implement it in the right way.

I thought that it would be possible to use the already existent AscendingTimestampExtractor, overriding the extractTimestamp method, but it's marked final.
new FlinkKafkaConsumer010[Event](ingestion_topic, new JSONDeserializationSchema(), consumerConfig)
.setStartFromLatest()
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
def extractAscendingTimestamp(element: Event): Long = ???
})
Should I need to implement my own TimestampExtractor (with the appropriate getCurrentWatermark and extractTimestamp methods) ?

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010

Thank you,
Federico


Reply | Threaded
Open this post in threaded view
|

Re: Timestamp from Kafka record and watermark generation

Federico D'Ambrosio-2
Thank you very much Aljoscha!

2018-02-23 14:45 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,

This is a know issue: https://issues.apache.org/jira/browse/FLINK-8500. And yes, the workaround is to write an assigner from scratch but you can start by copying the code of AscendingTimestampExtractor.

Sorry for the inconvenience.

--
Aljoscha

On 22. Feb 2018, at 12:05, Federico D'Ambrosio <[hidden email]> wrote:

Hello everyone,

I'm consuming from a Kafka topic, on which I'm writing with a FlinkKafkaProducer, with the timestamp relative flag set to true.

From what I gather from the documentation [1], Flink is aware of Kafka Record's timestamp and only the watermark should be set with an appropriate TimestampExtractor, still I'm failing to understand how to implement it in the right way.

I thought that it would be possible to use the already existent AscendingTimestampExtractor, overriding the extractTimestamp method, but it's marked final.
new FlinkKafkaConsumer010[Event](ingestion_topic, new JSONDeserializationSchema(), consumerConfig)
.setStartFromLatest()
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
def extractAscendingTimestamp(element: Event): Long = ???
})
Should I need to implement my own TimestampExtractor (with the appropriate getCurrentWatermark and extractTimestamp methods) ?

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010

Thank you,
Federico





--
Federico D'Ambrosio