http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Custom-TimestampExtractor-and-FlinkKafkaConsumer082-tp3488p3492.html
are you also using the timestamp extractor when you are using env.fromCollection().
Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
> On 15 Nov 2015, at 22:55, Konstantin Knauf <
[hidden email]> wrote:
>
> Hi everyone,
>
> I have the following issue with Flink (0.10) and Kafka.
>
> I am using a very simple TimestampExtractor like [1], which just
> extracts a millis timestamp from a POJO. In my streaming job, I read in
> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>
> stream = env.addSource(new FlinkKafkaConsumer082<
> (parameterTool.getRequired("topic"),
> new AvroPojoDeserializationSchema(),
> parameterTool.getProperties()))
>
> I have timestampEnabled() and the TimeCharacteristics are EventTime,
> AutoWatermarkIntervall is 500.
>
> The problem is, when I do something like:
>
> stream.assignTimestamps(new PojoTimestampExtractor(6000))
> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
> .sum(..)
> .print()
>
> env.execute();
>
> the windows never get triggered.
>
> If I use ProcessingTime it works.
> If I use env.fromCollection(...) instead of the KafkaSource it works
> with EventTime, too.
>
> Any ideas what I could be doing wrong are highly appreciated.
>
> Cheers,
>
> Konstantin
>
> [1]:
>
> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>
> final private long maxDelay;
>
> public PojoTimestampExtractor(long maxDelay) {
> this.maxDelay = maxDelay;
> }
>
> @Override
> public long extractTimestamp(Pojo fightEvent, long l) {
> return pojo.getTime();
> }
>
> @Override
> public long extractWatermark(Pojo pojo, long l) {
> return pojo.getTime() - maxDelay;
> }
>
> @Override
> public long getCurrentWatermark() {
> return Long.MIN_VALUE;
> }