http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Custom-TimestampExtractor-and-FlinkKafkaConsumer082-tp3488p3496.html
ok, now I at least understand, why it works with fromElements(...). For
choice, if i understand the semantics correctly. It just affects
> Hi,
> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>
> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>
> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>
> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>
> Cheers,
> Aljoscha
>> On 16 Nov 2015, at 10:42, Gyula Fóra <
[hidden email]> wrote:
>>
>> Could this part of the extractor be the problem Aljoscha?
>>
>> @Override
>> public long getCurrentWatermark() {
>> return Long.MIN_VALUE;
>> }
>>
>> Gyula
>>
>> Konstantin Knauf <
[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>> Hi Aljoscha,
>>
>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>
>> The timestamps look good to me. Here an example.
>>
>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>
>> The order now is
>>
>> stream
>> .map(dummyMapper)
>> .assignTimestamps(...)
>> .timeWindow(...)
>>
>> Is there a way to print out the assigned timestamps after
>> stream.assignTimestamps(...)?
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>> Hi,
>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>
>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <
[hidden email]> wrote:
>>>>
>>>> Hi everyone,
>>>>
>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>
>>>> I am using a very simple TimestampExtractor like [1], which just
>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>
>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>> (parameterTool.getRequired("topic"),
>>>> new AvroPojoDeserializationSchema(),
>>>> parameterTool.getProperties()))
>>>>
>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>> AutoWatermarkIntervall is 500.
>>>>
>>>> The problem is, when I do something like:
>>>>
>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>> .sum(..)
>>>> .print()
>>>>
>>>> env.execute();
>>>>
>>>> the windows never get triggered.
>>>>
>>>> If I use ProcessingTime it works.
>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>> with EventTime, too.
>>>>
>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>> [1]:
>>>>
>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>
>>>> final private long maxDelay;
>>>>
>>>> public PojoTimestampExtractor(long maxDelay) {
>>>> this.maxDelay = maxDelay;
>>>> }
>>>>
>>>> @Override
>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>>>> return pojo.getTime();
>>>> }
>>>>
>>>> @Override
>>>> public long extractWatermark(Pojo pojo, long l) {
>>>> return pojo.getTime() - maxDelay;
>>>> }
>>>>
>>>> @Override
>>>> public long getCurrentWatermark() {
>>>> return Long.MIN_VALUE;
>>>> }
>>>
>>>
>>
>> --
>> Konstantin Knauf *
[hidden email] * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke