Custom TimestampExtractor and FlinkKafkaConsumer082
Posted by snntr on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Custom-TimestampExtractor-and-FlinkKafkaConsumer082-tp3488.html
Hi everyone,
I have the following issue with Flink (0.10) and Kafka.
I am using a very simple TimestampExtractor like [1], which just
extracts a millis timestamp from a POJO. In my streaming job, I read in
these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
stream = env.addSource(new FlinkKafkaConsumer082<
(parameterTool.getRequired("topic"),
new AvroPojoDeserializationSchema(),
parameterTool.getProperties()))
I have timestampEnabled() and the TimeCharacteristics are EventTime,
AutoWatermarkIntervall is 500.
The problem is, when I do something like:
stream.assignTimestamps(new PojoTimestampExtractor(6000))
.timeWindowAll(Time.of(1, TimeUnit.SECONDS)
.sum(..)
.print()
env.execute();
the windows never get triggered.
If I use ProcessingTime it works.
If I use env.fromCollection(...) instead of the KafkaSource it works
with EventTime, too.
Any ideas what I could be doing wrong are highly appreciated.
Cheers,
Konstantin
[1]:
public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
final private long maxDelay;
public PojoTimestampExtractor(long maxDelay) {
this.maxDelay = maxDelay;
}
@Override
public long extractTimestamp(Pojo fightEvent, long l) {
return pojo.getTime();
}
@Override
public long extractWatermark(Pojo pojo, long l) {
return pojo.getTime() - maxDelay;
}
@Override
public long getCurrentWatermark() {
return Long.MIN_VALUE;
}