http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Custom-TimestampExtractor-and-FlinkKafkaConsumer082-tp3488p3493.html
thanks for your answer. Yes I am using the same TimestampExtractor-Class.
The timestamps look good to me. Here an example.
> Hi,
> are you also using the timestamp extractor when you are using env.fromCollection().
>
> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>
> Cheers,
> Aljoscha
>> On 15 Nov 2015, at 22:55, Konstantin Knauf <
[hidden email]> wrote:
>>
>> Hi everyone,
>>
>> I have the following issue with Flink (0.10) and Kafka.
>>
>> I am using a very simple TimestampExtractor like [1], which just
>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>
>> stream = env.addSource(new FlinkKafkaConsumer082<
>> (parameterTool.getRequired("topic"),
>> new AvroPojoDeserializationSchema(),
>> parameterTool.getProperties()))
>>
>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>> AutoWatermarkIntervall is 500.
>>
>> The problem is, when I do something like:
>>
>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>> .sum(..)
>> .print()
>>
>> env.execute();
>>
>> the windows never get triggered.
>>
>> If I use ProcessingTime it works.
>> If I use env.fromCollection(...) instead of the KafkaSource it works
>> with EventTime, too.
>>
>> Any ideas what I could be doing wrong are highly appreciated.
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1]:
>>
>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>
>> final private long maxDelay;
>>
>> public PojoTimestampExtractor(long maxDelay) {
>> this.maxDelay = maxDelay;
>> }
>>
>> @Override
>> public long extractTimestamp(Pojo fightEvent, long l) {
>> return pojo.getTime();
>> }
>>
>> @Override
>> public long extractWatermark(Pojo pojo, long l) {
>> return pojo.getTime() - maxDelay;
>> }
>>
>> @Override
>> public long getCurrentWatermark() {
>> return Long.MIN_VALUE;
>> }
>
>
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke