Custom TimestampExtractor and FlinkKafkaConsumer082

classic Classic list List threaded Threaded
22 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Custom TimestampExtractor and FlinkKafkaConsumer082

snntr
Hi everyone,

I have the following issue with Flink (0.10) and Kafka.

I am using a very simple TimestampExtractor like [1], which just
extracts a millis timestamp from a POJO. In my streaming job, I read in
these POJOs from Kafka using the FlinkKafkaConsumer082 like this:

stream = env.addSource(new FlinkKafkaConsumer082<
(parameterTool.getRequired("topic"),
                new AvroPojoDeserializationSchema(),
parameterTool.getProperties()))

I have timestampEnabled() and the TimeCharacteristics are EventTime,
AutoWatermarkIntervall is 500.

The problem is, when I do something like:

stream.assignTimestamps(new PojoTimestampExtractor(6000))
.timeWindowAll(Time.of(1, TimeUnit.SECONDS)
.sum(..)
.print()

env.execute();

the windows never get triggered.

If I use ProcessingTime it works.
If I use env.fromCollection(...) instead of the KafkaSource it works
with EventTime, too.

Any ideas what I could be doing wrong are highly appreciated.

Cheers,

Konstantin

[1]:

public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {

    final private long maxDelay;

    public  PojoTimestampExtractor(long maxDelay) {
        this.maxDelay = maxDelay;
    }

    @Override
    public long extractTimestamp(Pojo fightEvent, long l) {
        return pojo.getTime();
    }

    @Override
    public long extractWatermark(Pojo pojo, long l) {
        return pojo.getTime() - maxDelay;
    }

    @Override
    public long getCurrentWatermark() {
        return Long.MIN_VALUE;
    }
Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

Aljoscha Krettek
Hi,
are you also using the timestamp extractor when you are using env.fromCollection().

Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.

Cheers,
Aljoscha

> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>
> Hi everyone,
>
> I have the following issue with Flink (0.10) and Kafka.
>
> I am using a very simple TimestampExtractor like [1], which just
> extracts a millis timestamp from a POJO. In my streaming job, I read in
> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>
> stream = env.addSource(new FlinkKafkaConsumer082<
> (parameterTool.getRequired("topic"),
>                new AvroPojoDeserializationSchema(),
> parameterTool.getProperties()))
>
> I have timestampEnabled() and the TimeCharacteristics are EventTime,
> AutoWatermarkIntervall is 500.
>
> The problem is, when I do something like:
>
> stream.assignTimestamps(new PojoTimestampExtractor(6000))
> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
> .sum(..)
> .print()
>
> env.execute();
>
> the windows never get triggered.
>
> If I use ProcessingTime it works.
> If I use env.fromCollection(...) instead of the KafkaSource it works
> with EventTime, too.
>
> Any ideas what I could be doing wrong are highly appreciated.
>
> Cheers,
>
> Konstantin
>
> [1]:
>
> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>
>    final private long maxDelay;
>
>    public  PojoTimestampExtractor(long maxDelay) {
>        this.maxDelay = maxDelay;
>    }
>
>    @Override
>    public long extractTimestamp(Pojo fightEvent, long l) {
>        return pojo.getTime();
>    }
>
>    @Override
>    public long extractWatermark(Pojo pojo, long l) {
>        return pojo.getTime() - maxDelay;
>    }
>
>    @Override
>    public long getCurrentWatermark() {
>        return Long.MIN_VALUE;
>    }

Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

snntr
Hi Aljoscha,

thanks for your answer. Yes I am using the same TimestampExtractor-Class.

The timestamps look good to me. Here an example.

{"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00

The order now is

stream
.map(dummyMapper)
.assignTimestamps(...)
.timeWindow(...)

Is there a way to print out the assigned timestamps after
stream.assignTimestamps(...)?

Cheers,

Konstantin


On 16.11.2015 10:31, Aljoscha Krettek wrote:

> Hi,
> are you also using the timestamp extractor when you are using env.fromCollection().
>
> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>
> Cheers,
> Aljoscha
>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>
>> Hi everyone,
>>
>> I have the following issue with Flink (0.10) and Kafka.
>>
>> I am using a very simple TimestampExtractor like [1], which just
>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>
>> stream = env.addSource(new FlinkKafkaConsumer082<
>> (parameterTool.getRequired("topic"),
>>                new AvroPojoDeserializationSchema(),
>> parameterTool.getProperties()))
>>
>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>> AutoWatermarkIntervall is 500.
>>
>> The problem is, when I do something like:
>>
>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>> .sum(..)
>> .print()
>>
>> env.execute();
>>
>> the windows never get triggered.
>>
>> If I use ProcessingTime it works.
>> If I use env.fromCollection(...) instead of the KafkaSource it works
>> with EventTime, too.
>>
>> Any ideas what I could be doing wrong are highly appreciated.
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1]:
>>
>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>
>>    final private long maxDelay;
>>
>>    public  PojoTimestampExtractor(long maxDelay) {
>>        this.maxDelay = maxDelay;
>>    }
>>
>>    @Override
>>    public long extractTimestamp(Pojo fightEvent, long l) {
>>        return pojo.getTime();
>>    }
>>
>>    @Override
>>    public long extractWatermark(Pojo pojo, long l) {
>>        return pojo.getTime() - maxDelay;
>>    }
>>
>>    @Override
>>    public long getCurrentWatermark() {
>>        return Long.MIN_VALUE;
>>    }
>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

Gyula Fóra
Could this part of the extractor be the problem Aljoscha?

@Override
    public long getCurrentWatermark() {
        return Long.MIN_VALUE;
    }

Gyula

Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
Hi Aljoscha,

thanks for your answer. Yes I am using the same TimestampExtractor-Class.

The timestamps look good to me. Here an example.

{"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00

The order now is

stream
.map(dummyMapper)
.assignTimestamps(...)
.timeWindow(...)

Is there a way to print out the assigned timestamps after
stream.assignTimestamps(...)?

Cheers,

Konstantin


On 16.11.2015 10:31, Aljoscha Krettek wrote:
> Hi,
> are you also using the timestamp extractor when you are using env.fromCollection().
>
> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>
> Cheers,
> Aljoscha
>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>
>> Hi everyone,
>>
>> I have the following issue with Flink (0.10) and Kafka.
>>
>> I am using a very simple TimestampExtractor like [1], which just
>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>
>> stream = env.addSource(new FlinkKafkaConsumer082<
>> (parameterTool.getRequired("topic"),
>>                new AvroPojoDeserializationSchema(),
>> parameterTool.getProperties()))
>>
>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>> AutoWatermarkIntervall is 500.
>>
>> The problem is, when I do something like:
>>
>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>> .sum(..)
>> .print()
>>
>> env.execute();
>>
>> the windows never get triggered.
>>
>> If I use ProcessingTime it works.
>> If I use env.fromCollection(...) instead of the KafkaSource it works
>> with EventTime, too.
>>
>> Any ideas what I could be doing wrong are highly appreciated.
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1]:
>>
>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>
>>    final private long maxDelay;
>>
>>    public  PojoTimestampExtractor(long maxDelay) {
>>        this.maxDelay = maxDelay;
>>    }
>>
>>    @Override
>>    public long extractTimestamp(Pojo fightEvent, long l) {
>>        return pojo.getTime();
>>    }
>>
>>    @Override
>>    public long extractWatermark(Pojo pojo, long l) {
>>        return pojo.getTime() - maxDelay;
>>    }
>>
>>    @Override
>>    public long getCurrentWatermark() {
>>        return Long.MIN_VALUE;
>>    }
>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

Aljoscha Krettek
Hi,
it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.

First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
 - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
 - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
 - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark

What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.

The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.

Cheers,
Aljoscha

> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>
> Could this part of the extractor be the problem Aljoscha?
>
> @Override
>     public long getCurrentWatermark() {
>         return Long.MIN_VALUE;
>     }
>
> Gyula
>
> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
> Hi Aljoscha,
>
> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>
> The timestamps look good to me. Here an example.
>
> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>
> The order now is
>
> stream
> .map(dummyMapper)
> .assignTimestamps(...)
> .timeWindow(...)
>
> Is there a way to print out the assigned timestamps after
> stream.assignTimestamps(...)?
>
> Cheers,
>
> Konstantin
>
>
> On 16.11.2015 10:31, Aljoscha Krettek wrote:
> > Hi,
> > are you also using the timestamp extractor when you are using env.fromCollection().
> >
> > Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
> >
> > Cheers,
> > Aljoscha
> >> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
> >>
> >> Hi everyone,
> >>
> >> I have the following issue with Flink (0.10) and Kafka.
> >>
> >> I am using a very simple TimestampExtractor like [1], which just
> >> extracts a millis timestamp from a POJO. In my streaming job, I read in
> >> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
> >>
> >> stream = env.addSource(new FlinkKafkaConsumer082<
> >> (parameterTool.getRequired("topic"),
> >>                new AvroPojoDeserializationSchema(),
> >> parameterTool.getProperties()))
> >>
> >> I have timestampEnabled() and the TimeCharacteristics are EventTime,
> >> AutoWatermarkIntervall is 500.
> >>
> >> The problem is, when I do something like:
> >>
> >> stream.assignTimestamps(new PojoTimestampExtractor(6000))
> >> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
> >> .sum(..)
> >> .print()
> >>
> >> env.execute();
> >>
> >> the windows never get triggered.
> >>
> >> If I use ProcessingTime it works.
> >> If I use env.fromCollection(...) instead of the KafkaSource it works
> >> with EventTime, too.
> >>
> >> Any ideas what I could be doing wrong are highly appreciated.
> >>
> >> Cheers,
> >>
> >> Konstantin
> >>
> >> [1]:
> >>
> >> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
> >>
> >>    final private long maxDelay;
> >>
> >>    public  PojoTimestampExtractor(long maxDelay) {
> >>        this.maxDelay = maxDelay;
> >>    }
> >>
> >>    @Override
> >>    public long extractTimestamp(Pojo fightEvent, long l) {
> >>        return pojo.getTime();
> >>    }
> >>
> >>    @Override
> >>    public long extractWatermark(Pojo pojo, long l) {
> >>        return pojo.getTime() - maxDelay;
> >>    }
> >>
> >>    @Override
> >>    public long getCurrentWatermark() {
> >>        return Long.MIN_VALUE;
> >>    }
> >
> >
>
> --
> Konstantin Knauf * [hidden email] * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

snntr
Hi Aljoscha,

ok, now I at least understand, why it works with fromElements(...). For
the rest I am not so sure.

> What this means in your case is that the watermark can only advance if
a new element arrives, because only then is the watermark updated.

But new elements arrive all the time, about 50/s, or do you mean
something else?

getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
choice, if i understand the semantics correctly. It just affects
watermarking in the absence of events, right?

Cheers,

Konstantin


On 16.11.2015 12:31, Aljoscha Krettek wrote:

> Hi,
> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>
> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>  - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>  - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>  - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>
> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>
> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>
> Cheers,
> Aljoscha
>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>
>> Could this part of the extractor be the problem Aljoscha?
>>
>> @Override
>>     public long getCurrentWatermark() {
>>         return Long.MIN_VALUE;
>>     }
>>
>> Gyula
>>
>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>> Hi Aljoscha,
>>
>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>
>> The timestamps look good to me. Here an example.
>>
>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>
>> The order now is
>>
>> stream
>> .map(dummyMapper)
>> .assignTimestamps(...)
>> .timeWindow(...)
>>
>> Is there a way to print out the assigned timestamps after
>> stream.assignTimestamps(...)?
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>> Hi,
>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>
>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>
>>>> Hi everyone,
>>>>
>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>
>>>> I am using a very simple TimestampExtractor like [1], which just
>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>
>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>> (parameterTool.getRequired("topic"),
>>>>                new AvroPojoDeserializationSchema(),
>>>> parameterTool.getProperties()))
>>>>
>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>> AutoWatermarkIntervall is 500.
>>>>
>>>> The problem is, when I do something like:
>>>>
>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>> .sum(..)
>>>> .print()
>>>>
>>>> env.execute();
>>>>
>>>> the windows never get triggered.
>>>>
>>>> If I use ProcessingTime it works.
>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>> with EventTime, too.
>>>>
>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>> [1]:
>>>>
>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>
>>>>    final private long maxDelay;
>>>>
>>>>    public  PojoTimestampExtractor(long maxDelay) {
>>>>        this.maxDelay = maxDelay;
>>>>    }
>>>>
>>>>    @Override
>>>>    public long extractTimestamp(Pojo fightEvent, long l) {
>>>>        return pojo.getTime();
>>>>    }
>>>>
>>>>    @Override
>>>>    public long extractWatermark(Pojo pojo, long l) {
>>>>        return pojo.getTime() - maxDelay;
>>>>    }
>>>>
>>>>    @Override
>>>>    public long getCurrentWatermark() {
>>>>        return Long.MIN_VALUE;
>>>>    }
>>>
>>>
>>
>> --
>> Konstantin Knauf * [hidden email] * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

Aljoscha Krettek
Hi,
yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.

But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.

Cheers,
Aljoscha

> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> ok, now I at least understand, why it works with fromElements(...). For
> the rest I am not so sure.
>
>> What this means in your case is that the watermark can only advance if
> a new element arrives, because only then is the watermark updated.
>
> But new elements arrive all the time, about 50/s, or do you mean
> something else?
>
> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
> choice, if i understand the semantics correctly. It just affects
> watermarking in the absence of events, right?
>
> Cheers,
>
> Konstantin
>
>
> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>> Hi,
>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>>
>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>>
>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>>
>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>>
>> Cheers,
>> Aljoscha
>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>>
>>> Could this part of the extractor be the problem Aljoscha?
>>>
>>> @Override
>>>    public long getCurrentWatermark() {
>>>        return Long.MIN_VALUE;
>>>    }
>>>
>>> Gyula
>>>
>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>>> Hi Aljoscha,
>>>
>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>
>>> The timestamps look good to me. Here an example.
>>>
>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>
>>> The order now is
>>>
>>> stream
>>> .map(dummyMapper)
>>> .assignTimestamps(...)
>>> .timeWindow(...)
>>>
>>> Is there a way to print out the assigned timestamps after
>>> stream.assignTimestamps(...)?
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>>
>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>> Hi,
>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>>
>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>
>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>
>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>> (parameterTool.getRequired("topic"),
>>>>>               new AvroPojoDeserializationSchema(),
>>>>> parameterTool.getProperties()))
>>>>>
>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>> AutoWatermarkIntervall is 500.
>>>>>
>>>>> The problem is, when I do something like:
>>>>>
>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>> .sum(..)
>>>>> .print()
>>>>>
>>>>> env.execute();
>>>>>
>>>>> the windows never get triggered.
>>>>>
>>>>> If I use ProcessingTime it works.
>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>> with EventTime, too.
>>>>>
>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>> [1]:
>>>>>
>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>
>>>>>   final private long maxDelay;
>>>>>
>>>>>   public  PojoTimestampExtractor(long maxDelay) {
>>>>>       this.maxDelay = maxDelay;
>>>>>   }
>>>>>
>>>>>   @Override
>>>>>   public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>       return pojo.getTime();
>>>>>   }
>>>>>
>>>>>   @Override
>>>>>   public long extractWatermark(Pojo pojo, long l) {
>>>>>       return pojo.getTime() - maxDelay;
>>>>>   }
>>>>>
>>>>>   @Override
>>>>>   public long getCurrentWatermark() {
>>>>>       return Long.MIN_VALUE;
>>>>>   }
>>>>
>>>>
>>>
>>> --
>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>
> --
> Konstantin Knauf * [hidden email] * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

snntr
Hi Aljoscha,

I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
emit with getCurrentWatermark [1] as you suggested. So basically I do
the opposite than before (only watermarks per events vs only watermarks
per autowatermark). And now it works :). The question remains, why it
did not work before. As far as I see, it is an issue with the first
TimestmapExtractor itself?!

Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?

Cheers,

Konstantin

[1]

    final private long maxDelay;
    private long lastTimestamp = Long.MIN_VALUE;

    public PojoTimestampExtractor(long maxDelay) {
        this.maxDelay = maxDelay;
    }

    @Override
    public long extractTimestamp(Pojo pojo, long l) {
        lastTimestamp = pojo.getTime();
        return pojo.getTime();
    }

    @Override
    public long extractWatermark(Pojo pojo, long l) {
        return Long.MIN_VALUE;
    }

    @Override
    public long getCurrentWatermark() {
        return lastTimestamp - maxDelay;
    }


On 16.11.2015 13:37, Aljoscha Krettek wrote:

> Hi,
> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
>
> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
>
> Cheers,
> Aljoscha
>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
>>
>> Hi Aljoscha,
>>
>> ok, now I at least understand, why it works with fromElements(...). For
>> the rest I am not so sure.
>>
>>> What this means in your case is that the watermark can only advance if
>> a new element arrives, because only then is the watermark updated.
>>
>> But new elements arrive all the time, about 50/s, or do you mean
>> something else?
>>
>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>> choice, if i understand the semantics correctly. It just affects
>> watermarking in the absence of events, right?
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>> Hi,
>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>>>
>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>>>
>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>>>
>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>>>
>>>> Could this part of the extractor be the problem Aljoscha?
>>>>
>>>> @Override
>>>>    public long getCurrentWatermark() {
>>>>        return Long.MIN_VALUE;
>>>>    }
>>>>
>>>> Gyula
>>>>
>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>>>> Hi Aljoscha,
>>>>
>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>
>>>> The timestamps look good to me. Here an example.
>>>>
>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>
>>>> The order now is
>>>>
>>>> stream
>>>> .map(dummyMapper)
>>>> .assignTimestamps(...)
>>>> .timeWindow(...)
>>>>
>>>> Is there a way to print out the assigned timestamps after
>>>> stream.assignTimestamps(...)?
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>>
>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>> Hi,
>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>>>
>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>
>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>
>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>> (parameterTool.getRequired("topic"),
>>>>>>               new AvroPojoDeserializationSchema(),
>>>>>> parameterTool.getProperties()))
>>>>>>
>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>> AutoWatermarkIntervall is 500.
>>>>>>
>>>>>> The problem is, when I do something like:
>>>>>>
>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>> .sum(..)
>>>>>> .print()
>>>>>>
>>>>>> env.execute();
>>>>>>
>>>>>> the windows never get triggered.
>>>>>>
>>>>>> If I use ProcessingTime it works.
>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>> with EventTime, too.
>>>>>>
>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Konstantin
>>>>>>
>>>>>> [1]:
>>>>>>
>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>>
>>>>>>   final private long maxDelay;
>>>>>>
>>>>>>   public  PojoTimestampExtractor(long maxDelay) {
>>>>>>       this.maxDelay = maxDelay;
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>       return pojo.getTime();
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public long extractWatermark(Pojo pojo, long l) {
>>>>>>       return pojo.getTime() - maxDelay;
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public long getCurrentWatermark() {
>>>>>>       return Long.MIN_VALUE;
>>>>>>   }
>>>>>
>>>>>
>>>>
>>>> --
>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>
>>>
>>
>> --
>> Konstantin Knauf * [hidden email] * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

Aljoscha Krettek
Hi,
yes, unfortunately, there is a bug in the timestamp extraction operator that sets the “last-seen watermark” to Long.MIN_VALUE even though it should not when calling getCurrentWatermark().

I’m opening an Issue and adding a fix to the latest master and the branch for the 0.10.x bugfix releases. Sorry for the inconvenience.

Cheers,
Aljoscha

> On 16 Nov 2015, at 18:13, Konstantin Knauf <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
> emit with getCurrentWatermark [1] as you suggested. So basically I do
> the opposite than before (only watermarks per events vs only watermarks
> per autowatermark). And now it works :). The question remains, why it
> did not work before. As far as I see, it is an issue with the first
> TimestmapExtractor itself?!
>
> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>
> Cheers,
>
> Konstantin
>
> [1]
>
>    final private long maxDelay;
>    private long lastTimestamp = Long.MIN_VALUE;
>
>    public PojoTimestampExtractor(long maxDelay) {
>        this.maxDelay = maxDelay;
>    }
>
>    @Override
>    public long extractTimestamp(Pojo pojo, long l) {
>        lastTimestamp = pojo.getTime();
>        return pojo.getTime();
>    }
>
>    @Override
>    public long extractWatermark(Pojo pojo, long l) {
>        return Long.MIN_VALUE;
>    }
>
>    @Override
>    public long getCurrentWatermark() {
>        return lastTimestamp - maxDelay;
>    }
>
>
> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>> Hi,
>> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
>>
>> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
>>
>> Cheers,
>> Aljoscha
>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
>>>
>>> Hi Aljoscha,
>>>
>>> ok, now I at least understand, why it works with fromElements(...). For
>>> the rest I am not so sure.
>>>
>>>> What this means in your case is that the watermark can only advance if
>>> a new element arrives, because only then is the watermark updated.
>>>
>>> But new elements arrive all the time, about 50/s, or do you mean
>>> something else?
>>>
>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>> choice, if i understand the semantics correctly. It just affects
>>> watermarking in the absence of events, right?
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>>
>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>> Hi,
>>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>>>>
>>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>>>>
>>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>>>>
>>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>>>>
>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>
>>>>> @Override
>>>>>   public long getCurrentWatermark() {
>>>>>       return Long.MIN_VALUE;
>>>>>   }
>>>>>
>>>>> Gyula
>>>>>
>>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>>>>> Hi Aljoscha,
>>>>>
>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>>
>>>>> The timestamps look good to me. Here an example.
>>>>>
>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>
>>>>> The order now is
>>>>>
>>>>> stream
>>>>> .map(dummyMapper)
>>>>> .assignTimestamps(...)
>>>>> .timeWindow(...)
>>>>>
>>>>> Is there a way to print out the assigned timestamps after
>>>>> stream.assignTimestamps(...)?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>>
>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>> Hi,
>>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>>>>
>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>
>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>
>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>              new AvroPojoDeserializationSchema(),
>>>>>>> parameterTool.getProperties()))
>>>>>>>
>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>
>>>>>>> The problem is, when I do something like:
>>>>>>>
>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>> .sum(..)
>>>>>>> .print()
>>>>>>>
>>>>>>> env.execute();
>>>>>>>
>>>>>>> the windows never get triggered.
>>>>>>>
>>>>>>> If I use ProcessingTime it works.
>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>> with EventTime, too.
>>>>>>>
>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Konstantin
>>>>>>>
>>>>>>> [1]:
>>>>>>>
>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>>>
>>>>>>>  final private long maxDelay;
>>>>>>>
>>>>>>>  public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>      this.maxDelay = maxDelay;
>>>>>>>  }
>>>>>>>
>>>>>>>  @Override
>>>>>>>  public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>      return pojo.getTime();
>>>>>>>  }
>>>>>>>
>>>>>>>  @Override
>>>>>>>  public long extractWatermark(Pojo pojo, long l) {
>>>>>>>      return pojo.getTime() - maxDelay;
>>>>>>>  }
>>>>>>>
>>>>>>>  @Override
>>>>>>>  public long getCurrentWatermark() {
>>>>>>>      return Long.MIN_VALUE;
>>>>>>>  }
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>
>>>>
>>>
>>> --
>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>
> --
> Konstantin Knauf * [hidden email] * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

Aljoscha Krettek
In reply to this post by snntr
Hi,
actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor sometimes emits a watermark that is lower than the one before. (This is the result of the bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from all upstream operators and only advance the watermark monotonically in lockstep with them. This way, the watermark cannot decrease at an operator.

In your case, you have a topology with parallelism 1, I assume. In that case the operators are chained. (There is no separate operators but basically only one operator and element transmission happens in function calls). In this setting the watermarks are directly forwarded to operators without going through the logic I mentioned above.

Cheers,
Aljoscha

> On 16 Nov 2015, at 18:13, Konstantin Knauf <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
> emit with getCurrentWatermark [1] as you suggested. So basically I do
> the opposite than before (only watermarks per events vs only watermarks
> per autowatermark). And now it works :). The question remains, why it
> did not work before. As far as I see, it is an issue with the first
> TimestmapExtractor itself?!
>
> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>
> Cheers,
>
> Konstantin
>
> [1]
>
>    final private long maxDelay;
>    private long lastTimestamp = Long.MIN_VALUE;
>
>    public PojoTimestampExtractor(long maxDelay) {
>        this.maxDelay = maxDelay;
>    }
>
>    @Override
>    public long extractTimestamp(Pojo pojo, long l) {
>        lastTimestamp = pojo.getTime();
>        return pojo.getTime();
>    }
>
>    @Override
>    public long extractWatermark(Pojo pojo, long l) {
>        return Long.MIN_VALUE;
>    }
>
>    @Override
>    public long getCurrentWatermark() {
>        return lastTimestamp - maxDelay;
>    }
>
>
> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>> Hi,
>> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
>>
>> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
>>
>> Cheers,
>> Aljoscha
>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
>>>
>>> Hi Aljoscha,
>>>
>>> ok, now I at least understand, why it works with fromElements(...). For
>>> the rest I am not so sure.
>>>
>>>> What this means in your case is that the watermark can only advance if
>>> a new element arrives, because only then is the watermark updated.
>>>
>>> But new elements arrive all the time, about 50/s, or do you mean
>>> something else?
>>>
>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>> choice, if i understand the semantics correctly. It just affects
>>> watermarking in the absence of events, right?
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>>
>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>> Hi,
>>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>>>>
>>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>>>>
>>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>>>>
>>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>>>>
>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>
>>>>> @Override
>>>>>   public long getCurrentWatermark() {
>>>>>       return Long.MIN_VALUE;
>>>>>   }
>>>>>
>>>>> Gyula
>>>>>
>>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>>>>> Hi Aljoscha,
>>>>>
>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>>
>>>>> The timestamps look good to me. Here an example.
>>>>>
>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>
>>>>> The order now is
>>>>>
>>>>> stream
>>>>> .map(dummyMapper)
>>>>> .assignTimestamps(...)
>>>>> .timeWindow(...)
>>>>>
>>>>> Is there a way to print out the assigned timestamps after
>>>>> stream.assignTimestamps(...)?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>>
>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>> Hi,
>>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>>>>
>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>
>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>
>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>              new AvroPojoDeserializationSchema(),
>>>>>>> parameterTool.getProperties()))
>>>>>>>
>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>
>>>>>>> The problem is, when I do something like:
>>>>>>>
>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>> .sum(..)
>>>>>>> .print()
>>>>>>>
>>>>>>> env.execute();
>>>>>>>
>>>>>>> the windows never get triggered.
>>>>>>>
>>>>>>> If I use ProcessingTime it works.
>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>> with EventTime, too.
>>>>>>>
>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Konstantin
>>>>>>>
>>>>>>> [1]:
>>>>>>>
>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>>>
>>>>>>>  final private long maxDelay;
>>>>>>>
>>>>>>>  public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>      this.maxDelay = maxDelay;
>>>>>>>  }
>>>>>>>
>>>>>>>  @Override
>>>>>>>  public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>      return pojo.getTime();
>>>>>>>  }
>>>>>>>
>>>>>>>  @Override
>>>>>>>  public long extractWatermark(Pojo pojo, long l) {
>>>>>>>      return pojo.getTime() - maxDelay;
>>>>>>>  }
>>>>>>>
>>>>>>>  @Override
>>>>>>>  public long getCurrentWatermark() {
>>>>>>>      return Long.MIN_VALUE;
>>>>>>>  }
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>
>>>>
>>>
>>> --
>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>
> --
> Konstantin Knauf * [hidden email] * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

snntr
Hi Aljoscha,

Are you sure? I am running the job from my IDE at the moment.

If I set

StreamExecutionEnvironment.setParallelism(1);

I works with the old TimestampExtractor (returning Long.MIN_VALUE from
getCurrentWatermark() and emitting a watermark at every record)

If I set

StreamExecutionEnvironment.setParallelism(5);

it does not work.

So, if I understood you correctly, it is the opposite of what you were
expecting?!

Cheers,

Konstantin


On 17.11.2015 11:32, Aljoscha Krettek wrote:

> Hi,
> actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor sometimes emits a watermark that is lower than the one before. (This is the result of the bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from all upstream operators and only advance the watermark monotonically in lockstep with them. This way, the watermark cannot decrease at an operator.
>
> In your case, you have a topology with parallelism 1, I assume. In that case the operators are chained. (There is no separate operators but basically only one operator and element transmission happens in function calls). In this setting the watermarks are directly forwarded to operators without going through the logic I mentioned above.
>
> Cheers,
> Aljoscha
>> On 16 Nov 2015, at 18:13, Konstantin Knauf <[hidden email]> wrote:
>>
>> Hi Aljoscha,
>>
>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>> the opposite than before (only watermarks per events vs only watermarks
>> per autowatermark). And now it works :). The question remains, why it
>> did not work before. As far as I see, it is an issue with the first
>> TimestmapExtractor itself?!
>>
>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1]
>>
>>    final private long maxDelay;
>>    private long lastTimestamp = Long.MIN_VALUE;
>>
>>    public PojoTimestampExtractor(long maxDelay) {
>>        this.maxDelay = maxDelay;
>>    }
>>
>>    @Override
>>    public long extractTimestamp(Pojo pojo, long l) {
>>        lastTimestamp = pojo.getTime();
>>        return pojo.getTime();
>>    }
>>
>>    @Override
>>    public long extractWatermark(Pojo pojo, long l) {
>>        return Long.MIN_VALUE;
>>    }
>>
>>    @Override
>>    public long getCurrentWatermark() {
>>        return lastTimestamp - maxDelay;
>>    }
>>
>>
>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>> Hi,
>>> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
>>>
>>> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
>>>>
>>>> Hi Aljoscha,
>>>>
>>>> ok, now I at least understand, why it works with fromElements(...). For
>>>> the rest I am not so sure.
>>>>
>>>>> What this means in your case is that the watermark can only advance if
>>>> a new element arrives, because only then is the watermark updated.
>>>>
>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>> something else?
>>>>
>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>> choice, if i understand the semantics correctly. It just affects
>>>> watermarking in the absence of events, right?
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>>
>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>> Hi,
>>>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>>>>>
>>>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>>>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>>>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>>>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>>>>>
>>>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>>>>>
>>>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>>>>>
>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>
>>>>>> @Override
>>>>>>   public long getCurrentWatermark() {
>>>>>>       return Long.MIN_VALUE;
>>>>>>   }
>>>>>>
>>>>>> Gyula
>>>>>>
>>>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>>>>>> Hi Aljoscha,
>>>>>>
>>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>>>
>>>>>> The timestamps look good to me. Here an example.
>>>>>>
>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>
>>>>>> The order now is
>>>>>>
>>>>>> stream
>>>>>> .map(dummyMapper)
>>>>>> .assignTimestamps(...)
>>>>>> .timeWindow(...)
>>>>>>
>>>>>> Is there a way to print out the assigned timestamps after
>>>>>> stream.assignTimestamps(...)?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Konstantin
>>>>>>
>>>>>>
>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>> Hi,
>>>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>>>>>
>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>
>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>
>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>              new AvroPojoDeserializationSchema(),
>>>>>>>> parameterTool.getProperties()))
>>>>>>>>
>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>
>>>>>>>> The problem is, when I do something like:
>>>>>>>>
>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>> .sum(..)
>>>>>>>> .print()
>>>>>>>>
>>>>>>>> env.execute();
>>>>>>>>
>>>>>>>> the windows never get triggered.
>>>>>>>>
>>>>>>>> If I use ProcessingTime it works.
>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>>> with EventTime, too.
>>>>>>>>
>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Konstantin
>>>>>>>>
>>>>>>>> [1]:
>>>>>>>>
>>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>>>>
>>>>>>>>  final private long maxDelay;
>>>>>>>>
>>>>>>>>  public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>      this.maxDelay = maxDelay;
>>>>>>>>  }
>>>>>>>>
>>>>>>>>  @Override
>>>>>>>>  public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>      return pojo.getTime();
>>>>>>>>  }
>>>>>>>>
>>>>>>>>  @Override
>>>>>>>>  public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>      return pojo.getTime() - maxDelay;
>>>>>>>>  }
>>>>>>>>
>>>>>>>>  @Override
>>>>>>>>  public long getCurrentWatermark() {
>>>>>>>>      return Long.MIN_VALUE;
>>>>>>>>  }
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>
>>>>>
>>>>
>>>> --
>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>
>>>
>>
>> --
>> Konstantin Knauf * [hidden email] * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

Aljoscha Krettek
Hmm, that’s very strange. I’ll continue looking into it.

> On 17 Nov 2015, at 21:42, Konstantin Knauf <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> Are you sure? I am running the job from my IDE at the moment.
>
> If I set
>
> StreamExecutionEnvironment.setParallelism(1);
>
> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
> getCurrentWatermark() and emitting a watermark at every record)
>
> If I set
>
> StreamExecutionEnvironment.setParallelism(5);
>
> it does not work.
>
> So, if I understood you correctly, it is the opposite of what you were
> expecting?!
>
> Cheers,
>
> Konstantin
>
>
> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>> Hi,
>> actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor sometimes emits a watermark that is lower than the one before. (This is the result of the bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from all upstream operators and only advance the watermark monotonically in lockstep with them. This way, the watermark cannot decrease at an operator.
>>
>> In your case, you have a topology with parallelism 1, I assume. In that case the operators are chained. (There is no separate operators but basically only one operator and element transmission happens in function calls). In this setting the watermarks are directly forwarded to operators without going through the logic I mentioned above.
>>
>> Cheers,
>> Aljoscha
>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <[hidden email]> wrote:
>>>
>>> Hi Aljoscha,
>>>
>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>> the opposite than before (only watermarks per events vs only watermarks
>>> per autowatermark). And now it works :). The question remains, why it
>>> did not work before. As far as I see, it is an issue with the first
>>> TimestmapExtractor itself?!
>>>
>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> [1]
>>>
>>>   final private long maxDelay;
>>>   private long lastTimestamp = Long.MIN_VALUE;
>>>
>>>   public PojoTimestampExtractor(long maxDelay) {
>>>       this.maxDelay = maxDelay;
>>>   }
>>>
>>>   @Override
>>>   public long extractTimestamp(Pojo pojo, long l) {
>>>       lastTimestamp = pojo.getTime();
>>>       return pojo.getTime();
>>>   }
>>>
>>>   @Override
>>>   public long extractWatermark(Pojo pojo, long l) {
>>>       return Long.MIN_VALUE;
>>>   }
>>>
>>>   @Override
>>>   public long getCurrentWatermark() {
>>>       return lastTimestamp - maxDelay;
>>>   }
>>>
>>>
>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>>> Hi,
>>>> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
>>>>
>>>> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
>>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> ok, now I at least understand, why it works with fromElements(...). For
>>>>> the rest I am not so sure.
>>>>>
>>>>>> What this means in your case is that the watermark can only advance if
>>>>> a new element arrives, because only then is the watermark updated.
>>>>>
>>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>>> something else?
>>>>>
>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>>> choice, if i understand the semantics correctly. It just affects
>>>>> watermarking in the absence of events, right?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>>
>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>>> Hi,
>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>>>>>>
>>>>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>>>>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>>>>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>>>>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>>>>>>
>>>>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>>>>>>
>>>>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>>>>>>
>>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>>
>>>>>>> @Override
>>>>>>>  public long getCurrentWatermark() {
>>>>>>>      return Long.MIN_VALUE;
>>>>>>>  }
>>>>>>>
>>>>>>> Gyula
>>>>>>>
>>>>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>>>>>>> Hi Aljoscha,
>>>>>>>
>>>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>>>>
>>>>>>> The timestamps look good to me. Here an example.
>>>>>>>
>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>>
>>>>>>> The order now is
>>>>>>>
>>>>>>> stream
>>>>>>> .map(dummyMapper)
>>>>>>> .assignTimestamps(...)
>>>>>>> .timeWindow(...)
>>>>>>>
>>>>>>> Is there a way to print out the assigned timestamps after
>>>>>>> stream.assignTimestamps(...)?
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Konstantin
>>>>>>>
>>>>>>>
>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>>> Hi,
>>>>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>>>>>>
>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>>
>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>>
>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>>             new AvroPojoDeserializationSchema(),
>>>>>>>>> parameterTool.getProperties()))
>>>>>>>>>
>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>>
>>>>>>>>> The problem is, when I do something like:
>>>>>>>>>
>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>>> .sum(..)
>>>>>>>>> .print()
>>>>>>>>>
>>>>>>>>> env.execute();
>>>>>>>>>
>>>>>>>>> the windows never get triggered.
>>>>>>>>>
>>>>>>>>> If I use ProcessingTime it works.
>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>>>> with EventTime, too.
>>>>>>>>>
>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Konstantin
>>>>>>>>>
>>>>>>>>> [1]:
>>>>>>>>>
>>>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>>>>>
>>>>>>>>> final private long maxDelay;
>>>>>>>>>
>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>>     this.maxDelay = maxDelay;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>>     return pojo.getTime();
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>>     return pojo.getTime() - maxDelay;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>     return Long.MIN_VALUE;
>>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>
>>>>
>>>
>>> --
>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>
> --
> Konstantin Knauf * [hidden email] * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

Aljoscha Krettek
In reply to this post by snntr
Hi Konstantin,
I still didn’t come up with an explanation for the behavior. Could you maybe send me example code (and example data if it is necessary to reproduce the problem.)? This would really help me pinpoint the problem.

Cheers,
Aljoscha

> On 17 Nov 2015, at 21:42, Konstantin Knauf <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> Are you sure? I am running the job from my IDE at the moment.
>
> If I set
>
> StreamExecutionEnvironment.setParallelism(1);
>
> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
> getCurrentWatermark() and emitting a watermark at every record)
>
> If I set
>
> StreamExecutionEnvironment.setParallelism(5);
>
> it does not work.
>
> So, if I understood you correctly, it is the opposite of what you were
> expecting?!
>
> Cheers,
>
> Konstantin
>
>
> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>> Hi,
>> actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor sometimes emits a watermark that is lower than the one before. (This is the result of the bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from all upstream operators and only advance the watermark monotonically in lockstep with them. This way, the watermark cannot decrease at an operator.
>>
>> In your case, you have a topology with parallelism 1, I assume. In that case the operators are chained. (There is no separate operators but basically only one operator and element transmission happens in function calls). In this setting the watermarks are directly forwarded to operators without going through the logic I mentioned above.
>>
>> Cheers,
>> Aljoscha
>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <[hidden email]> wrote:
>>>
>>> Hi Aljoscha,
>>>
>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>> the opposite than before (only watermarks per events vs only watermarks
>>> per autowatermark). And now it works :). The question remains, why it
>>> did not work before. As far as I see, it is an issue with the first
>>> TimestmapExtractor itself?!
>>>
>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> [1]
>>>
>>>   final private long maxDelay;
>>>   private long lastTimestamp = Long.MIN_VALUE;
>>>
>>>   public PojoTimestampExtractor(long maxDelay) {
>>>       this.maxDelay = maxDelay;
>>>   }
>>>
>>>   @Override
>>>   public long extractTimestamp(Pojo pojo, long l) {
>>>       lastTimestamp = pojo.getTime();
>>>       return pojo.getTime();
>>>   }
>>>
>>>   @Override
>>>   public long extractWatermark(Pojo pojo, long l) {
>>>       return Long.MIN_VALUE;
>>>   }
>>>
>>>   @Override
>>>   public long getCurrentWatermark() {
>>>       return lastTimestamp - maxDelay;
>>>   }
>>>
>>>
>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>>> Hi,
>>>> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
>>>>
>>>> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
>>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> ok, now I at least understand, why it works with fromElements(...). For
>>>>> the rest I am not so sure.
>>>>>
>>>>>> What this means in your case is that the watermark can only advance if
>>>>> a new element arrives, because only then is the watermark updated.
>>>>>
>>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>>> something else?
>>>>>
>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>>> choice, if i understand the semantics correctly. It just affects
>>>>> watermarking in the absence of events, right?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>>
>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>>> Hi,
>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>>>>>>
>>>>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>>>>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>>>>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>>>>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>>>>>>
>>>>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>>>>>>
>>>>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>>>>>>
>>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>>
>>>>>>> @Override
>>>>>>>  public long getCurrentWatermark() {
>>>>>>>      return Long.MIN_VALUE;
>>>>>>>  }
>>>>>>>
>>>>>>> Gyula
>>>>>>>
>>>>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>>>>>>> Hi Aljoscha,
>>>>>>>
>>>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>>>>
>>>>>>> The timestamps look good to me. Here an example.
>>>>>>>
>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>>
>>>>>>> The order now is
>>>>>>>
>>>>>>> stream
>>>>>>> .map(dummyMapper)
>>>>>>> .assignTimestamps(...)
>>>>>>> .timeWindow(...)
>>>>>>>
>>>>>>> Is there a way to print out the assigned timestamps after
>>>>>>> stream.assignTimestamps(...)?
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Konstantin
>>>>>>>
>>>>>>>
>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>>> Hi,
>>>>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>>>>>>
>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>>
>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>>
>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>>             new AvroPojoDeserializationSchema(),
>>>>>>>>> parameterTool.getProperties()))
>>>>>>>>>
>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>>
>>>>>>>>> The problem is, when I do something like:
>>>>>>>>>
>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>>> .sum(..)
>>>>>>>>> .print()
>>>>>>>>>
>>>>>>>>> env.execute();
>>>>>>>>>
>>>>>>>>> the windows never get triggered.
>>>>>>>>>
>>>>>>>>> If I use ProcessingTime it works.
>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>>>> with EventTime, too.
>>>>>>>>>
>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Konstantin
>>>>>>>>>
>>>>>>>>> [1]:
>>>>>>>>>
>>>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>>>>>
>>>>>>>>> final private long maxDelay;
>>>>>>>>>
>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>>     this.maxDelay = maxDelay;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>>     return pojo.getTime();
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>>     return pojo.getTime() - maxDelay;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>     return Long.MIN_VALUE;
>>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>
>>>>
>>>
>>> --
>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>
> --
> Konstantin Knauf * [hidden email] * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

snntr
Hi Aljoscha,

sure, will do. I have neither found a solution. I won't have time to put
a minimal example together before the weekend though.

Cheers,

Konstantin

On 25.11.2015 19:10, Aljoscha Krettek wrote:

> Hi Konstantin,
> I still didn’t come up with an explanation for the behavior. Could you maybe send me example code (and example data if it is necessary to reproduce the problem.)? This would really help me pinpoint the problem.
>
> Cheers,
> Aljoscha
>> On 17 Nov 2015, at 21:42, Konstantin Knauf <[hidden email]> wrote:
>>
>> Hi Aljoscha,
>>
>> Are you sure? I am running the job from my IDE at the moment.
>>
>> If I set
>>
>> StreamExecutionEnvironment.setParallelism(1);
>>
>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
>> getCurrentWatermark() and emitting a watermark at every record)
>>
>> If I set
>>
>> StreamExecutionEnvironment.setParallelism(5);
>>
>> it does not work.
>>
>> So, if I understood you correctly, it is the opposite of what you were
>> expecting?!
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>>> Hi,
>>> actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor sometimes emits a watermark that is lower than the one before. (This is the result of the bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from all upstream operators and only advance the watermark monotonically in lockstep with them. This way, the watermark cannot decrease at an operator.
>>>
>>> In your case, you have a topology with parallelism 1, I assume. In that case the operators are chained. (There is no separate operators but basically only one operator and element transmission happens in function calls). In this setting the watermarks are directly forwarded to operators without going through the logic I mentioned above.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <[hidden email]> wrote:
>>>>
>>>> Hi Aljoscha,
>>>>
>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>>> the opposite than before (only watermarks per events vs only watermarks
>>>> per autowatermark). And now it works :). The question remains, why it
>>>> did not work before. As far as I see, it is an issue with the first
>>>> TimestmapExtractor itself?!
>>>>
>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>> [1]
>>>>
>>>>   final private long maxDelay;
>>>>   private long lastTimestamp = Long.MIN_VALUE;
>>>>
>>>>   public PojoTimestampExtractor(long maxDelay) {
>>>>       this.maxDelay = maxDelay;
>>>>   }
>>>>
>>>>   @Override
>>>>   public long extractTimestamp(Pojo pojo, long l) {
>>>>       lastTimestamp = pojo.getTime();
>>>>       return pojo.getTime();
>>>>   }
>>>>
>>>>   @Override
>>>>   public long extractWatermark(Pojo pojo, long l) {
>>>>       return Long.MIN_VALUE;
>>>>   }
>>>>
>>>>   @Override
>>>>   public long getCurrentWatermark() {
>>>>       return lastTimestamp - maxDelay;
>>>>   }
>>>>
>>>>
>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>>>> Hi,
>>>>> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
>>>>>
>>>>> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
>>>>>>
>>>>>> Hi Aljoscha,
>>>>>>
>>>>>> ok, now I at least understand, why it works with fromElements(...). For
>>>>>> the rest I am not so sure.
>>>>>>
>>>>>>> What this means in your case is that the watermark can only advance if
>>>>>> a new element arrives, because only then is the watermark updated.
>>>>>>
>>>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>>>> something else?
>>>>>>
>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>>>> choice, if i understand the semantics correctly. It just affects
>>>>>> watermarking in the absence of events, right?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Konstantin
>>>>>>
>>>>>>
>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>>>> Hi,
>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>>>>>>>
>>>>>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>>>>>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>>>>>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>>>>>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>>>>>>>
>>>>>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>>>>>>>
>>>>>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>>>
>>>>>>>> @Override
>>>>>>>>  public long getCurrentWatermark() {
>>>>>>>>      return Long.MIN_VALUE;
>>>>>>>>  }
>>>>>>>>
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>>>>>>>> Hi Aljoscha,
>>>>>>>>
>>>>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>>>>>
>>>>>>>> The timestamps look good to me. Here an example.
>>>>>>>>
>>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>>>
>>>>>>>> The order now is
>>>>>>>>
>>>>>>>> stream
>>>>>>>> .map(dummyMapper)
>>>>>>>> .assignTimestamps(...)
>>>>>>>> .timeWindow(...)
>>>>>>>>
>>>>>>>> Is there a way to print out the assigned timestamps after
>>>>>>>> stream.assignTimestamps(...)?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Konstantin
>>>>>>>>
>>>>>>>>
>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>>>> Hi,
>>>>>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>>>>>>>
>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi everyone,
>>>>>>>>>>
>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>>>
>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>>>
>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>>>             new AvroPojoDeserializationSchema(),
>>>>>>>>>> parameterTool.getProperties()))
>>>>>>>>>>
>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>>>
>>>>>>>>>> The problem is, when I do something like:
>>>>>>>>>>
>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>>>> .sum(..)
>>>>>>>>>> .print()
>>>>>>>>>>
>>>>>>>>>> env.execute();
>>>>>>>>>>
>>>>>>>>>> the windows never get triggered.
>>>>>>>>>>
>>>>>>>>>> If I use ProcessingTime it works.
>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>>>>> with EventTime, too.
>>>>>>>>>>
>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>>
>>>>>>>>>> Konstantin
>>>>>>>>>>
>>>>>>>>>> [1]:
>>>>>>>>>>
>>>>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>>>>>>
>>>>>>>>>> final private long maxDelay;
>>>>>>>>>>
>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>>>     this.maxDelay = maxDelay;
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>>>     return pojo.getTime();
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>>>     return pojo.getTime() - maxDelay;
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>>     return Long.MIN_VALUE;
>>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>
>>>>>
>>>>
>>>> --
>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>
>>>
>>
>> --
>> Konstantin Knauf * [hidden email] * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

snntr
Hi Aljoscha,

I have put together a gist [1] with two classes, a short processing
pipeline, which shows the behavior and a data generator to write records
into Kafka. I hope I remembered everything we discussed correctly.

So basically in the example it works with "TimestampExtractor1" only for
parallelism 1, with "TimestampExtractor2" it works regardless of the
parallelism. Run from the IDE.

Let me know if you need anything else.

Cheers,

Konstantin

[1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d

On 25.11.2015 21:15, Konstantin Knauf wrote:

> Hi Aljoscha,
>
> sure, will do. I have neither found a solution. I won't have time to put
> a minimal example together before the weekend though.
>
> Cheers,
>
> Konstantin
>
> On 25.11.2015 19:10, Aljoscha Krettek wrote:
>> Hi Konstantin,
>> I still didn’t come up with an explanation for the behavior. Could you maybe send me example code (and example data if it is necessary to reproduce the problem.)? This would really help me pinpoint the problem.
>>
>> Cheers,
>> Aljoscha
>>> On 17 Nov 2015, at 21:42, Konstantin Knauf <[hidden email]> wrote:
>>>
>>> Hi Aljoscha,
>>>
>>> Are you sure? I am running the job from my IDE at the moment.
>>>
>>> If I set
>>>
>>> StreamExecutionEnvironment.setParallelism(1);
>>>
>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
>>> getCurrentWatermark() and emitting a watermark at every record)
>>>
>>> If I set
>>>
>>> StreamExecutionEnvironment.setParallelism(5);
>>>
>>> it does not work.
>>>
>>> So, if I understood you correctly, it is the opposite of what you were
>>> expecting?!
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>>
>>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>>>> Hi,
>>>> actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor sometimes emits a watermark that is lower than the one before. (This is the result of the bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from all upstream operators and only advance the watermark monotonically in lockstep with them. This way, the watermark cannot decrease at an operator.
>>>>
>>>> In your case, you have a topology with parallelism 1, I assume. In that case the operators are chained. (There is no separate operators but basically only one operator and element transmission happens in function calls). In this setting the watermarks are directly forwarded to operators without going through the logic I mentioned above.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <[hidden email]> wrote:
>>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>>>> the opposite than before (only watermarks per events vs only watermarks
>>>>> per autowatermark). And now it works :). The question remains, why it
>>>>> did not work before. As far as I see, it is an issue with the first
>>>>> TimestmapExtractor itself?!
>>>>>
>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>> [1]
>>>>>
>>>>>   final private long maxDelay;
>>>>>   private long lastTimestamp = Long.MIN_VALUE;
>>>>>
>>>>>   public PojoTimestampExtractor(long maxDelay) {
>>>>>       this.maxDelay = maxDelay;
>>>>>   }
>>>>>
>>>>>   @Override
>>>>>   public long extractTimestamp(Pojo pojo, long l) {
>>>>>       lastTimestamp = pojo.getTime();
>>>>>       return pojo.getTime();
>>>>>   }
>>>>>
>>>>>   @Override
>>>>>   public long extractWatermark(Pojo pojo, long l) {
>>>>>       return Long.MIN_VALUE;
>>>>>   }
>>>>>
>>>>>   @Override
>>>>>   public long getCurrentWatermark() {
>>>>>       return lastTimestamp - maxDelay;
>>>>>   }
>>>>>
>>>>>
>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>>>>> Hi,
>>>>>> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
>>>>>>
>>>>>> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hi Aljoscha,
>>>>>>>
>>>>>>> ok, now I at least understand, why it works with fromElements(...). For
>>>>>>> the rest I am not so sure.
>>>>>>>
>>>>>>>> What this means in your case is that the watermark can only advance if
>>>>>>> a new element arrives, because only then is the watermark updated.
>>>>>>>
>>>>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>>>>> something else?
>>>>>>>
>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>>>>> choice, if i understand the semantics correctly. It just affects
>>>>>>> watermarking in the absence of events, right?
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Konstantin
>>>>>>>
>>>>>>>
>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>>>>> Hi,
>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>>>>>>>>
>>>>>>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>>>>>>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>>>>>>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>>>>>>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>>>>>>>>
>>>>>>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>>>>>>>>
>>>>>>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>>  public long getCurrentWatermark() {
>>>>>>>>>      return Long.MIN_VALUE;
>>>>>>>>>  }
>>>>>>>>>
>>>>>>>>> Gyula
>>>>>>>>>
>>>>>>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>>>>>>>>> Hi Aljoscha,
>>>>>>>>>
>>>>>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>>>>>>
>>>>>>>>> The timestamps look good to me. Here an example.
>>>>>>>>>
>>>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>>>>
>>>>>>>>> The order now is
>>>>>>>>>
>>>>>>>>> stream
>>>>>>>>> .map(dummyMapper)
>>>>>>>>> .assignTimestamps(...)
>>>>>>>>> .timeWindow(...)
>>>>>>>>>
>>>>>>>>> Is there a way to print out the assigned timestamps after
>>>>>>>>> stream.assignTimestamps(...)?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Konstantin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>>>>> Hi,
>>>>>>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>>>>>>>>
>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>
>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>>>>
>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>>>>
>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>>>>             new AvroPojoDeserializationSchema(),
>>>>>>>>>>> parameterTool.getProperties()))
>>>>>>>>>>>
>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>>>>
>>>>>>>>>>> The problem is, when I do something like:
>>>>>>>>>>>
>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>>>>> .sum(..)
>>>>>>>>>>> .print()
>>>>>>>>>>>
>>>>>>>>>>> env.execute();
>>>>>>>>>>>
>>>>>>>>>>> the windows never get triggered.
>>>>>>>>>>>
>>>>>>>>>>> If I use ProcessingTime it works.
>>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>>>>>> with EventTime, too.
>>>>>>>>>>>
>>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>>
>>>>>>>>>>> Konstantin
>>>>>>>>>>>
>>>>>>>>>>> [1]:
>>>>>>>>>>>
>>>>>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>>>>>>>
>>>>>>>>>>> final private long maxDelay;
>>>>>>>>>>>
>>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>>>>     this.maxDelay = maxDelay;
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>>>>     return pojo.getTime();
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>>>>     return pojo.getTime() - maxDelay;
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>>>     return Long.MIN_VALUE;
>>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>
>>>>
>>>
>>> --
>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

Aljoscha Krettek
Hi Konstantin,
I finally nailed down the problem. :-)

The basis of the problem is the fact that there is a mismatch in the parallelism of the Flink Kafka Consumer and the number of partitions in the Kafka Stream. I would assume that in your case the Kafka Stream has 1 partition. This means, that only one of the parallel instances of the Flink Kafka Consumer ever receives element, which in turn means that only one of the parallel instances of the timestamp extractor ever receives elements. This means that no watermarks get emitted for the other parallel instances which in turn means that the watermark does not advance downstream because the watermark at an operator is the minimum over all upstream watermarks. This explains why ExampleTimestampExtractor1 only works in the case with parallelism=1.

The reason why ExampleTimestampExtractor2 works in all parallelism settings is not very obvious. The secret is in this method:

@Override
public long getCurrentWatermark() {
   return lastTimestamp - maxDelay;
}

In the parallel instances that never receive any element lastTimestamp is set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE - maxDelay (+1)). Now, because the watermark at an operator is always the minimum over all watermarks from upstream operators the watermark at the window operator always tracks the watermark of the parallel instance that receives elements.

I hope this helps, but please let me know if I should provide more explanation. This is a very tricky topic.

Cheers,
Aljoscha

> On 29 Nov 2015, at 21:18, Konstantin Knauf <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> I have put together a gist [1] with two classes, a short processing
> pipeline, which shows the behavior and a data generator to write records
> into Kafka. I hope I remembered everything we discussed correctly.
>
> So basically in the example it works with "TimestampExtractor1" only for
> parallelism 1, with "TimestampExtractor2" it works regardless of the
> parallelism. Run from the IDE.
>
> Let me know if you need anything else.
>
> Cheers,
>
> Konstantin
>
> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
>
> On 25.11.2015 21:15, Konstantin Knauf wrote:
>> Hi Aljoscha,
>>
>> sure, will do. I have neither found a solution. I won't have time to put
>> a minimal example together before the weekend though.
>>
>> Cheers,
>>
>> Konstantin
>>
>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
>>> Hi Konstantin,
>>> I still didn’t come up with an explanation for the behavior. Could you maybe send me example code (and example data if it is necessary to reproduce the problem.)? This would really help me pinpoint the problem.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf <[hidden email]> wrote:
>>>>
>>>> Hi Aljoscha,
>>>>
>>>> Are you sure? I am running the job from my IDE at the moment.
>>>>
>>>> If I set
>>>>
>>>> StreamExecutionEnvironment.setParallelism(1);
>>>>
>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
>>>> getCurrentWatermark() and emitting a watermark at every record)
>>>>
>>>> If I set
>>>>
>>>> StreamExecutionEnvironment.setParallelism(5);
>>>>
>>>> it does not work.
>>>>
>>>> So, if I understood you correctly, it is the opposite of what you were
>>>> expecting?!
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>>
>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>>>>> Hi,
>>>>> actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor sometimes emits a watermark that is lower than the one before. (This is the result of the bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from all upstream operators and only advance the watermark monotonically in lockstep with them. This way, the watermark cannot decrease at an operator.
>>>>>
>>>>> In your case, you have a topology with parallelism 1, I assume. In that case the operators are chained. (There is no separate operators but basically only one operator and element transmission happens in function calls). In this setting the watermarks are directly forwarded to operators without going through the logic I mentioned above.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <[hidden email]> wrote:
>>>>>>
>>>>>> Hi Aljoscha,
>>>>>>
>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>>>>> the opposite than before (only watermarks per events vs only watermarks
>>>>>> per autowatermark). And now it works :). The question remains, why it
>>>>>> did not work before. As far as I see, it is an issue with the first
>>>>>> TimestmapExtractor itself?!
>>>>>>
>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Konstantin
>>>>>>
>>>>>> [1]
>>>>>>
>>>>>>  final private long maxDelay;
>>>>>>  private long lastTimestamp = Long.MIN_VALUE;
>>>>>>
>>>>>>  public PojoTimestampExtractor(long maxDelay) {
>>>>>>      this.maxDelay = maxDelay;
>>>>>>  }
>>>>>>
>>>>>>  @Override
>>>>>>  public long extractTimestamp(Pojo pojo, long l) {
>>>>>>      lastTimestamp = pojo.getTime();
>>>>>>      return pojo.getTime();
>>>>>>  }
>>>>>>
>>>>>>  @Override
>>>>>>  public long extractWatermark(Pojo pojo, long l) {
>>>>>>      return Long.MIN_VALUE;
>>>>>>  }
>>>>>>
>>>>>>  @Override
>>>>>>  public long getCurrentWatermark() {
>>>>>>      return lastTimestamp - maxDelay;
>>>>>>  }
>>>>>>
>>>>>>
>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>>>>>> Hi,
>>>>>>> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
>>>>>>>
>>>>>>> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Hi Aljoscha,
>>>>>>>>
>>>>>>>> ok, now I at least understand, why it works with fromElements(...). For
>>>>>>>> the rest I am not so sure.
>>>>>>>>
>>>>>>>>> What this means in your case is that the watermark can only advance if
>>>>>>>> a new element arrives, because only then is the watermark updated.
>>>>>>>>
>>>>>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>>>>>> something else?
>>>>>>>>
>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>>>>>> choice, if i understand the semantics correctly. It just affects
>>>>>>>> watermarking in the absence of events, right?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Konstantin
>>>>>>>>
>>>>>>>>
>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>>>>>> Hi,
>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>>>>>>>>>
>>>>>>>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>>>>>>>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>>>>>>>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>>>>>>>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>>>>>>>>>
>>>>>>>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>>>>>>>>>
>>>>>>>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>>     return Long.MIN_VALUE;
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Gyula
>>>>>>>>>>
>>>>>>>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>
>>>>>>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>>>>>>>
>>>>>>>>>> The timestamps look good to me. Here an example.
>>>>>>>>>>
>>>>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>>>>>
>>>>>>>>>> The order now is
>>>>>>>>>>
>>>>>>>>>> stream
>>>>>>>>>> .map(dummyMapper)
>>>>>>>>>> .assignTimestamps(...)
>>>>>>>>>> .timeWindow(...)
>>>>>>>>>>
>>>>>>>>>> Is there a way to print out the assigned timestamps after
>>>>>>>>>> stream.assignTimestamps(...)?
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>>
>>>>>>>>>> Konstantin
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>>>>>> Hi,
>>>>>>>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>>>>>>>>>
>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Aljoscha
>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>
>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>>>>>
>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>>>>>
>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>>>>>            new AvroPojoDeserializationSchema(),
>>>>>>>>>>>> parameterTool.getProperties()))
>>>>>>>>>>>>
>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>>>>>
>>>>>>>>>>>> The problem is, when I do something like:
>>>>>>>>>>>>
>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>>>>>> .sum(..)
>>>>>>>>>>>> .print()
>>>>>>>>>>>>
>>>>>>>>>>>> env.execute();
>>>>>>>>>>>>
>>>>>>>>>>>> the windows never get triggered.
>>>>>>>>>>>>
>>>>>>>>>>>> If I use ProcessingTime it works.
>>>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>>>>>>> with EventTime, too.
>>>>>>>>>>>>
>>>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>
>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>
>>>>>>>>>>>> [1]:
>>>>>>>>>>>>
>>>>>>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>>>>>>>>
>>>>>>>>>>>> final private long maxDelay;
>>>>>>>>>>>>
>>>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>>>>>    this.maxDelay = maxDelay;
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> @Override
>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>>>>>    return pojo.getTime();
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> @Override
>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>>>>>    return pojo.getTime() - maxDelay;
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> @Override
>>>>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>>>>    return Long.MIN_VALUE;
>>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>
>>>>>
>>>>
>>>> --
>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>
>>>
>>
>
> --
> Konstantin Knauf * [hidden email] * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

Aljoscha Krettek
Hi,
as an addition. I don’t have a solution yet, for the general problem of what happens when a parallel instance of a source never receives elements. This watermark business is very tricky...

Cheers,
Aljoscha

> On 30 Nov 2015, at 17:20, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi Konstantin,
> I finally nailed down the problem. :-)
>
> The basis of the problem is the fact that there is a mismatch in the parallelism of the Flink Kafka Consumer and the number of partitions in the Kafka Stream. I would assume that in your case the Kafka Stream has 1 partition. This means, that only one of the parallel instances of the Flink Kafka Consumer ever receives element, which in turn means that only one of the parallel instances of the timestamp extractor ever receives elements. This means that no watermarks get emitted for the other parallel instances which in turn means that the watermark does not advance downstream because the watermark at an operator is the minimum over all upstream watermarks. This explains why ExampleTimestampExtractor1 only works in the case with parallelism=1.
>
> The reason why ExampleTimestampExtractor2 works in all parallelism settings is not very obvious. The secret is in this method:
>
> @Override
> public long getCurrentWatermark() {
>   return lastTimestamp - maxDelay;
> }
>
> In the parallel instances that never receive any element lastTimestamp is set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE - maxDelay (+1)). Now, because the watermark at an operator is always the minimum over all watermarks from upstream operators the watermark at the window operator always tracks the watermark of the parallel instance that receives elements.
>
> I hope this helps, but please let me know if I should provide more explanation. This is a very tricky topic.
>
> Cheers,
> Aljoscha
>
>> On 29 Nov 2015, at 21:18, Konstantin Knauf <[hidden email]> wrote:
>>
>> Hi Aljoscha,
>>
>> I have put together a gist [1] with two classes, a short processing
>> pipeline, which shows the behavior and a data generator to write records
>> into Kafka. I hope I remembered everything we discussed correctly.
>>
>> So basically in the example it works with "TimestampExtractor1" only for
>> parallelism 1, with "TimestampExtractor2" it works regardless of the
>> parallelism. Run from the IDE.
>>
>> Let me know if you need anything else.
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
>>
>> On 25.11.2015 21:15, Konstantin Knauf wrote:
>>> Hi Aljoscha,
>>>
>>> sure, will do. I have neither found a solution. I won't have time to put
>>> a minimal example together before the weekend though.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
>>>> Hi Konstantin,
>>>> I still didn’t come up with an explanation for the behavior. Could you maybe send me example code (and example data if it is necessary to reproduce the problem.)? This would really help me pinpoint the problem.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf <[hidden email]> wrote:
>>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> Are you sure? I am running the job from my IDE at the moment.
>>>>>
>>>>> If I set
>>>>>
>>>>> StreamExecutionEnvironment.setParallelism(1);
>>>>>
>>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
>>>>> getCurrentWatermark() and emitting a watermark at every record)
>>>>>
>>>>> If I set
>>>>>
>>>>> StreamExecutionEnvironment.setParallelism(5);
>>>>>
>>>>> it does not work.
>>>>>
>>>>> So, if I understood you correctly, it is the opposite of what you were
>>>>> expecting?!
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>>
>>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>>>>>> Hi,
>>>>>> actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor sometimes emits a watermark that is lower than the one before. (This is the result of the bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from all upstream operators and only advance the watermark monotonically in lockstep with them. This way, the watermark cannot decrease at an operator.
>>>>>>
>>>>>> In your case, you have a topology with parallelism 1, I assume. In that case the operators are chained. (There is no separate operators but basically only one operator and element transmission happens in function calls). In this setting the watermarks are directly forwarded to operators without going through the logic I mentioned above.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hi Aljoscha,
>>>>>>>
>>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>>>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>>>>>> the opposite than before (only watermarks per events vs only watermarks
>>>>>>> per autowatermark). And now it works :). The question remains, why it
>>>>>>> did not work before. As far as I see, it is an issue with the first
>>>>>>> TimestmapExtractor itself?!
>>>>>>>
>>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Konstantin
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>>> final private long maxDelay;
>>>>>>> private long lastTimestamp = Long.MIN_VALUE;
>>>>>>>
>>>>>>> public PojoTimestampExtractor(long maxDelay) {
>>>>>>>     this.maxDelay = maxDelay;
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public long extractTimestamp(Pojo pojo, long l) {
>>>>>>>     lastTimestamp = pojo.getTime();
>>>>>>>     return pojo.getTime();
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>     return Long.MIN_VALUE;
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public long getCurrentWatermark() {
>>>>>>>     return lastTimestamp - maxDelay;
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>>>>>>> Hi,
>>>>>>>> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
>>>>>>>>
>>>>>>>> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> Hi Aljoscha,
>>>>>>>>>
>>>>>>>>> ok, now I at least understand, why it works with fromElements(...). For
>>>>>>>>> the rest I am not so sure.
>>>>>>>>>
>>>>>>>>>> What this means in your case is that the watermark can only advance if
>>>>>>>>> a new element arrives, because only then is the watermark updated.
>>>>>>>>>
>>>>>>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>>>>>>> something else?
>>>>>>>>>
>>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>>>>>>> choice, if i understand the semantics correctly. It just affects
>>>>>>>>> watermarking in the absence of events, right?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Konstantin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>>>>>>> Hi,
>>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>>>>>>>>>>
>>>>>>>>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>>>>>>>>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>>>>>>>>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>>>>>>>>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>>>>>>>>>>
>>>>>>>>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>>>>>>>>>>
>>>>>>>>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>>>    return Long.MIN_VALUE;
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> Gyula
>>>>>>>>>>>
>>>>>>>>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>
>>>>>>>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>>>>>>>>
>>>>>>>>>>> The timestamps look good to me. Here an example.
>>>>>>>>>>>
>>>>>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>>>>>>
>>>>>>>>>>> The order now is
>>>>>>>>>>>
>>>>>>>>>>> stream
>>>>>>>>>>> .map(dummyMapper)
>>>>>>>>>>> .assignTimestamps(...)
>>>>>>>>>>> .timeWindow(...)
>>>>>>>>>>>
>>>>>>>>>>> Is there a way to print out the assigned timestamps after
>>>>>>>>>>> stream.assignTimestamps(...)?
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>>
>>>>>>>>>>> Konstantin
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>>>>>>>>>>
>>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>>>>>>           new AvroPojoDeserializationSchema(),
>>>>>>>>>>>>> parameterTool.getProperties()))
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The problem is, when I do something like:
>>>>>>>>>>>>>
>>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>>>>>>> .sum(..)
>>>>>>>>>>>>> .print()
>>>>>>>>>>>>>
>>>>>>>>>>>>> env.execute();
>>>>>>>>>>>>>
>>>>>>>>>>>>> the windows never get triggered.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If I use ProcessingTime it works.
>>>>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>>>>>>>> with EventTime, too.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]:
>>>>>>>>>>>>>
>>>>>>>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>>>>>>>>>
>>>>>>>>>>>>> final private long maxDelay;
>>>>>>>>>>>>>
>>>>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>>>>>>   this.maxDelay = maxDelay;
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>>>>>>   return pojo.getTime();
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>>>>>>   return pojo.getTime() - maxDelay;
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>>>>>   return Long.MIN_VALUE;
>>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>
>>>>
>>>
>>
>> --
>> Konstantin Knauf * [hidden email] * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

Gyula Fóra
Hi,

I think what we will need at some point for this are approximate whatermarks which correlate event and ingest time.

I think they have similar concepts in Millwheel/Dataflow.

Cheers,
Gyula
On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
as an addition. I don’t have a solution yet, for the general problem of what happens when a parallel instance of a source never receives elements. This watermark business is very tricky...

Cheers,
Aljoscha
> On 30 Nov 2015, at 17:20, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi Konstantin,
> I finally nailed down the problem. :-)
>
> The basis of the problem is the fact that there is a mismatch in the parallelism of the Flink Kafka Consumer and the number of partitions in the Kafka Stream. I would assume that in your case the Kafka Stream has 1 partition. This means, that only one of the parallel instances of the Flink Kafka Consumer ever receives element, which in turn means that only one of the parallel instances of the timestamp extractor ever receives elements. This means that no watermarks get emitted for the other parallel instances which in turn means that the watermark does not advance downstream because the watermark at an operator is the minimum over all upstream watermarks. This explains why ExampleTimestampExtractor1 only works in the case with parallelism=1.
>
> The reason why ExampleTimestampExtractor2 works in all parallelism settings is not very obvious. The secret is in this method:
>
> @Override
> public long getCurrentWatermark() {
>   return lastTimestamp - maxDelay;
> }
>
> In the parallel instances that never receive any element lastTimestamp is set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE - maxDelay (+1)). Now, because the watermark at an operator is always the minimum over all watermarks from upstream operators the watermark at the window operator always tracks the watermark of the parallel instance that receives elements.
>
> I hope this helps, but please let me know if I should provide more explanation. This is a very tricky topic.
>
> Cheers,
> Aljoscha
>
>> On 29 Nov 2015, at 21:18, Konstantin Knauf <[hidden email]> wrote:
>>
>> Hi Aljoscha,
>>
>> I have put together a gist [1] with two classes, a short processing
>> pipeline, which shows the behavior and a data generator to write records
>> into Kafka. I hope I remembered everything we discussed correctly.
>>
>> So basically in the example it works with "TimestampExtractor1" only for
>> parallelism 1, with "TimestampExtractor2" it works regardless of the
>> parallelism. Run from the IDE.
>>
>> Let me know if you need anything else.
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
>>
>> On 25.11.2015 21:15, Konstantin Knauf wrote:
>>> Hi Aljoscha,
>>>
>>> sure, will do. I have neither found a solution. I won't have time to put
>>> a minimal example together before the weekend though.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
>>>> Hi Konstantin,
>>>> I still didn’t come up with an explanation for the behavior. Could you maybe send me example code (and example data if it is necessary to reproduce the problem.)? This would really help me pinpoint the problem.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf <[hidden email]> wrote:
>>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> Are you sure? I am running the job from my IDE at the moment.
>>>>>
>>>>> If I set
>>>>>
>>>>> StreamExecutionEnvironment.setParallelism(1);
>>>>>
>>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
>>>>> getCurrentWatermark() and emitting a watermark at every record)
>>>>>
>>>>> If I set
>>>>>
>>>>> StreamExecutionEnvironment.setParallelism(5);
>>>>>
>>>>> it does not work.
>>>>>
>>>>> So, if I understood you correctly, it is the opposite of what you were
>>>>> expecting?!
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>>
>>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>>>>>> Hi,
>>>>>> actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor sometimes emits a watermark that is lower than the one before. (This is the result of the bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from all upstream operators and only advance the watermark monotonically in lockstep with them. This way, the watermark cannot decrease at an operator.
>>>>>>
>>>>>> In your case, you have a topology with parallelism 1, I assume. In that case the operators are chained. (There is no separate operators but basically only one operator and element transmission happens in function calls). In this setting the watermarks are directly forwarded to operators without going through the logic I mentioned above.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hi Aljoscha,
>>>>>>>
>>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>>>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>>>>>> the opposite than before (only watermarks per events vs only watermarks
>>>>>>> per autowatermark). And now it works :). The question remains, why it
>>>>>>> did not work before. As far as I see, it is an issue with the first
>>>>>>> TimestmapExtractor itself?!
>>>>>>>
>>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Konstantin
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>>> final private long maxDelay;
>>>>>>> private long lastTimestamp = Long.MIN_VALUE;
>>>>>>>
>>>>>>> public PojoTimestampExtractor(long maxDelay) {
>>>>>>>     this.maxDelay = maxDelay;
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public long extractTimestamp(Pojo pojo, long l) {
>>>>>>>     lastTimestamp = pojo.getTime();
>>>>>>>     return pojo.getTime();
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>     return Long.MIN_VALUE;
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public long getCurrentWatermark() {
>>>>>>>     return lastTimestamp - maxDelay;
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>>>>>>> Hi,
>>>>>>>> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
>>>>>>>>
>>>>>>>> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> Hi Aljoscha,
>>>>>>>>>
>>>>>>>>> ok, now I at least understand, why it works with fromElements(...). For
>>>>>>>>> the rest I am not so sure.
>>>>>>>>>
>>>>>>>>>> What this means in your case is that the watermark can only advance if
>>>>>>>>> a new element arrives, because only then is the watermark updated.
>>>>>>>>>
>>>>>>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>>>>>>> something else?
>>>>>>>>>
>>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>>>>>>> choice, if i understand the semantics correctly. It just affects
>>>>>>>>> watermarking in the absence of events, right?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Konstantin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>>>>>>> Hi,
>>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
>>>>>>>>>>
>>>>>>>>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>>>>>>>>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
>>>>>>>>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
>>>>>>>>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
>>>>>>>>>>
>>>>>>>>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
>>>>>>>>>>
>>>>>>>>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>>>    return Long.MIN_VALUE;
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> Gyula
>>>>>>>>>>>
>>>>>>>>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>
>>>>>>>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>>>>>>>>
>>>>>>>>>>> The timestamps look good to me. Here an example.
>>>>>>>>>>>
>>>>>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>>>>>>
>>>>>>>>>>> The order now is
>>>>>>>>>>>
>>>>>>>>>>> stream
>>>>>>>>>>> .map(dummyMapper)
>>>>>>>>>>> .assignTimestamps(...)
>>>>>>>>>>> .timeWindow(...)
>>>>>>>>>>>
>>>>>>>>>>> Is there a way to print out the assigned timestamps after
>>>>>>>>>>> stream.assignTimestamps(...)?
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>>
>>>>>>>>>>> Konstantin
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>>>>>>>>>>
>>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>>>>>>           new AvroPojoDeserializationSchema(),
>>>>>>>>>>>>> parameterTool.getProperties()))
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The problem is, when I do something like:
>>>>>>>>>>>>>
>>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>>>>>>> .sum(..)
>>>>>>>>>>>>> .print()
>>>>>>>>>>>>>
>>>>>>>>>>>>> env.execute();
>>>>>>>>>>>>>
>>>>>>>>>>>>> the windows never get triggered.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If I use ProcessingTime it works.
>>>>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>>>>>>>> with EventTime, too.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]:
>>>>>>>>>>>>>
>>>>>>>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>>>>>>>>>
>>>>>>>>>>>>> final private long maxDelay;
>>>>>>>>>>>>>
>>>>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>>>>>>   this.maxDelay = maxDelay;
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>>>>>>   return pojo.getTime();
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>>>>>>   return pojo.getTime() - maxDelay;
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>>>>>   return Long.MIN_VALUE;
>>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>
>>>>
>>>
>>
>> --
>> Konstantin Knauf * [hidden email] * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

Aljoscha Krettek
Maybe. In the Kafka case we just need to ensure that parallel instances of the source that know that they don’t have any partitions assigned to them emit Long.MAX_VALUE as a watermark.

> On 30 Nov 2015, at 17:50, Gyula Fóra <[hidden email]> wrote:
>
> Hi,
>
> I think what we will need at some point for this are approximate whatermarks which correlate event and ingest time.
>
> I think they have similar concepts in Millwheel/Dataflow.
>
> Cheers,
> Gyula
> On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek <[hidden email]> wrote:
> Hi,
> as an addition. I don’t have a solution yet, for the general problem of what happens when a parallel instance of a source never receives elements. This watermark business is very tricky...
>
> Cheers,
> Aljoscha
> > On 30 Nov 2015, at 17:20, Aljoscha Krettek <[hidden email]> wrote:
> >
> > Hi Konstantin,
> > I finally nailed down the problem. :-)
> >
> > The basis of the problem is the fact that there is a mismatch in the parallelism of the Flink Kafka Consumer and the number of partitions in the Kafka Stream. I would assume that in your case the Kafka Stream has 1 partition. This means, that only one of the parallel instances of the Flink Kafka Consumer ever receives element, which in turn means that only one of the parallel instances of the timestamp extractor ever receives elements. This means that no watermarks get emitted for the other parallel instances which in turn means that the watermark does not advance downstream because the watermark at an operator is the minimum over all upstream watermarks. This explains why ExampleTimestampExtractor1 only works in the case with parallelism=1.
> >
> > The reason why ExampleTimestampExtractor2 works in all parallelism settings is not very obvious. The secret is in this method:
> >
> > @Override
> > public long getCurrentWatermark() {
> >   return lastTimestamp - maxDelay;
> > }
> >
> > In the parallel instances that never receive any element lastTimestamp is set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE - maxDelay (+1)). Now, because the watermark at an operator is always the minimum over all watermarks from upstream operators the watermark at the window operator always tracks the watermark of the parallel instance that receives elements.
> >
> > I hope this helps, but please let me know if I should provide more explanation. This is a very tricky topic.
> >
> > Cheers,
> > Aljoscha
> >
> >> On 29 Nov 2015, at 21:18, Konstantin Knauf <[hidden email]> wrote:
> >>
> >> Hi Aljoscha,
> >>
> >> I have put together a gist [1] with two classes, a short processing
> >> pipeline, which shows the behavior and a data generator to write records
> >> into Kafka. I hope I remembered everything we discussed correctly.
> >>
> >> So basically in the example it works with "TimestampExtractor1" only for
> >> parallelism 1, with "TimestampExtractor2" it works regardless of the
> >> parallelism. Run from the IDE.
> >>
> >> Let me know if you need anything else.
> >>
> >> Cheers,
> >>
> >> Konstantin
> >>
> >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
> >>
> >> On 25.11.2015 21:15, Konstantin Knauf wrote:
> >>> Hi Aljoscha,
> >>>
> >>> sure, will do. I have neither found a solution. I won't have time to put
> >>> a minimal example together before the weekend though.
> >>>
> >>> Cheers,
> >>>
> >>> Konstantin
> >>>
> >>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
> >>>> Hi Konstantin,
> >>>> I still didn’t come up with an explanation for the behavior. Could you maybe send me example code (and example data if it is necessary to reproduce the problem.)? This would really help me pinpoint the problem.
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf <[hidden email]> wrote:
> >>>>>
> >>>>> Hi Aljoscha,
> >>>>>
> >>>>> Are you sure? I am running the job from my IDE at the moment.
> >>>>>
> >>>>> If I set
> >>>>>
> >>>>> StreamExecutionEnvironment.setParallelism(1);
> >>>>>
> >>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
> >>>>> getCurrentWatermark() and emitting a watermark at every record)
> >>>>>
> >>>>> If I set
> >>>>>
> >>>>> StreamExecutionEnvironment.setParallelism(5);
> >>>>>
> >>>>> it does not work.
> >>>>>
> >>>>> So, if I understood you correctly, it is the opposite of what you were
> >>>>> expecting?!
> >>>>>
> >>>>> Cheers,
> >>>>>
> >>>>> Konstantin
> >>>>>
> >>>>>
> >>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
> >>>>>> Hi,
> >>>>>> actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor sometimes emits a watermark that is lower than the one before. (This is the result of the bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from all upstream operators and only advance the watermark monotonically in lockstep with them. This way, the watermark cannot decrease at an operator.
> >>>>>>
> >>>>>> In your case, you have a topology with parallelism 1, I assume. In that case the operators are chained. (There is no separate operators but basically only one operator and element transmission happens in function calls). In this setting the watermarks are directly forwarded to operators without going through the logic I mentioned above.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Aljoscha
> >>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <[hidden email]> wrote:
> >>>>>>>
> >>>>>>> Hi Aljoscha,
> >>>>>>>
> >>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
> >>>>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
> >>>>>>> the opposite than before (only watermarks per events vs only watermarks
> >>>>>>> per autowatermark). And now it works :). The question remains, why it
> >>>>>>> did not work before. As far as I see, it is an issue with the first
> >>>>>>> TimestmapExtractor itself?!
> >>>>>>>
> >>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>>
> >>>>>>> Konstantin
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>> final private long maxDelay;
> >>>>>>> private long lastTimestamp = Long.MIN_VALUE;
> >>>>>>>
> >>>>>>> public PojoTimestampExtractor(long maxDelay) {
> >>>>>>>     this.maxDelay = maxDelay;
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public long extractTimestamp(Pojo pojo, long l) {
> >>>>>>>     lastTimestamp = pojo.getTime();
> >>>>>>>     return pojo.getTime();
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public long extractWatermark(Pojo pojo, long l) {
> >>>>>>>     return Long.MIN_VALUE;
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public long getCurrentWatermark() {
> >>>>>>>     return lastTimestamp - maxDelay;
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
> >>>>>>>> Hi,
> >>>>>>>> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
> >>>>>>>>
> >>>>>>>> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Aljoscha
> >>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Aljoscha,
> >>>>>>>>>
> >>>>>>>>> ok, now I at least understand, why it works with fromElements(...). For
> >>>>>>>>> the rest I am not so sure.
> >>>>>>>>>
> >>>>>>>>>> What this means in your case is that the watermark can only advance if
> >>>>>>>>> a new element arrives, because only then is the watermark updated.
> >>>>>>>>>
> >>>>>>>>> But new elements arrive all the time, about 50/s, or do you mean
> >>>>>>>>> something else?
> >>>>>>>>>
> >>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
> >>>>>>>>> choice, if i understand the semantics correctly. It just affects
> >>>>>>>>> watermarking in the absence of events, right?
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>>
> >>>>>>>>> Konstantin
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
> >>>>>>>>>> Hi,
> >>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
> >>>>>>>>>>
> >>>>>>>>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
> >>>>>>>>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
> >>>>>>>>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
> >>>>>>>>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
> >>>>>>>>>>
> >>>>>>>>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
> >>>>>>>>>>
> >>>>>>>>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Aljoscha
> >>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Could this part of the extractor be the problem Aljoscha?
> >>>>>>>>>>>
> >>>>>>>>>>> @Override
> >>>>>>>>>>> public long getCurrentWatermark() {
> >>>>>>>>>>>    return Long.MIN_VALUE;
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> Gyula
> >>>>>>>>>>>
> >>>>>>>>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
> >>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>
> >>>>>>>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
> >>>>>>>>>>>
> >>>>>>>>>>> The timestamps look good to me. Here an example.
> >>>>>>>>>>>
> >>>>>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
> >>>>>>>>>>>
> >>>>>>>>>>> The order now is
> >>>>>>>>>>>
> >>>>>>>>>>> stream
> >>>>>>>>>>> .map(dummyMapper)
> >>>>>>>>>>> .assignTimestamps(...)
> >>>>>>>>>>> .timeWindow(...)
> >>>>>>>>>>>
> >>>>>>>>>>> Is there a way to print out the assigned timestamps after
> >>>>>>>>>>> stream.assignTimestamps(...)?
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>>
> >>>>>>>>>>> Konstantin
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
> >>>>>>>>>>>> Hi,
> >>>>>>>>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
> >>>>>>>>>>>>
> >>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
> >>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
> >>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
> >>>>>>>>>>>>> (parameterTool.getRequired("topic"),
> >>>>>>>>>>>>>           new AvroPojoDeserializationSchema(),
> >>>>>>>>>>>>> parameterTool.getProperties()))
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
> >>>>>>>>>>>>> AutoWatermarkIntervall is 500.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The problem is, when I do something like:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
> >>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
> >>>>>>>>>>>>> .sum(..)
> >>>>>>>>>>>>> .print()
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> env.execute();
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> the windows never get triggered.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If I use ProcessingTime it works.
> >>>>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
> >>>>>>>>>>>>> with EventTime, too.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Konstantin
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> final private long maxDelay;
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
> >>>>>>>>>>>>>   this.maxDelay = maxDelay;
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Override
> >>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
> >>>>>>>>>>>>>   return pojo.getTime();
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Override
> >>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
> >>>>>>>>>>>>>   return pojo.getTime() - maxDelay;
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Override
> >>>>>>>>>>>>> public long getCurrentWatermark() {
> >>>>>>>>>>>>>   return Long.MIN_VALUE;
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
> >>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
> >>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
> >>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>>>
> >>>>>>
> >>>>>
> >>>>> --
> >>>>> Konstantin Knauf * [hidden email] * +49-174-3413182
> >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>
> >>>>
> >>>
> >>
> >> --
> >> Konstantin Knauf * [hidden email] * +49-174-3413182
> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >
>

Reply | Threaded
Open this post in threaded view
|

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

rmetzger0
I think we need to find a solution for this problem soon. 
Another user is most likely affected: http://stackoverflow.com/q/34090808/568695

I've filed a JIRA for the problem: https://issues.apache.org/jira/browse/FLINK-3121


On Mon, Nov 30, 2015 at 5:58 PM, Aljoscha Krettek <[hidden email]> wrote:
Maybe. In the Kafka case we just need to ensure that parallel instances of the source that know that they don’t have any partitions assigned to them emit Long.MAX_VALUE as a watermark.

> On 30 Nov 2015, at 17:50, Gyula Fóra <[hidden email]> wrote:
>
> Hi,
>
> I think what we will need at some point for this are approximate whatermarks which correlate event and ingest time.
>
> I think they have similar concepts in Millwheel/Dataflow.
>
> Cheers,
> Gyula
> On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek <[hidden email]> wrote:
> Hi,
> as an addition. I don’t have a solution yet, for the general problem of what happens when a parallel instance of a source never receives elements. This watermark business is very tricky...
>
> Cheers,
> Aljoscha
> > On 30 Nov 2015, at 17:20, Aljoscha Krettek <[hidden email]> wrote:
> >
> > Hi Konstantin,
> > I finally nailed down the problem. :-)
> >
> > The basis of the problem is the fact that there is a mismatch in the parallelism of the Flink Kafka Consumer and the number of partitions in the Kafka Stream. I would assume that in your case the Kafka Stream has 1 partition. This means, that only one of the parallel instances of the Flink Kafka Consumer ever receives element, which in turn means that only one of the parallel instances of the timestamp extractor ever receives elements. This means that no watermarks get emitted for the other parallel instances which in turn means that the watermark does not advance downstream because the watermark at an operator is the minimum over all upstream watermarks. This explains why ExampleTimestampExtractor1 only works in the case with parallelism=1.
> >
> > The reason why ExampleTimestampExtractor2 works in all parallelism settings is not very obvious. The secret is in this method:
> >
> > @Override
> > public long getCurrentWatermark() {
> >   return lastTimestamp - maxDelay;
> > }
> >
> > In the parallel instances that never receive any element lastTimestamp is set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE - maxDelay (+1)). Now, because the watermark at an operator is always the minimum over all watermarks from upstream operators the watermark at the window operator always tracks the watermark of the parallel instance that receives elements.
> >
> > I hope this helps, but please let me know if I should provide more explanation. This is a very tricky topic.
> >
> > Cheers,
> > Aljoscha
> >
> >> On 29 Nov 2015, at 21:18, Konstantin Knauf <[hidden email]> wrote:
> >>
> >> Hi Aljoscha,
> >>
> >> I have put together a gist [1] with two classes, a short processing
> >> pipeline, which shows the behavior and a data generator to write records
> >> into Kafka. I hope I remembered everything we discussed correctly.
> >>
> >> So basically in the example it works with "TimestampExtractor1" only for
> >> parallelism 1, with "TimestampExtractor2" it works regardless of the
> >> parallelism. Run from the IDE.
> >>
> >> Let me know if you need anything else.
> >>
> >> Cheers,
> >>
> >> Konstantin
> >>
> >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
> >>
> >> On 25.11.2015 21:15, Konstantin Knauf wrote:
> >>> Hi Aljoscha,
> >>>
> >>> sure, will do. I have neither found a solution. I won't have time to put
> >>> a minimal example together before the weekend though.
> >>>
> >>> Cheers,
> >>>
> >>> Konstantin
> >>>
> >>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
> >>>> Hi Konstantin,
> >>>> I still didn’t come up with an explanation for the behavior. Could you maybe send me example code (and example data if it is necessary to reproduce the problem.)? This would really help me pinpoint the problem.
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf <[hidden email]> wrote:
> >>>>>
> >>>>> Hi Aljoscha,
> >>>>>
> >>>>> Are you sure? I am running the job from my IDE at the moment.
> >>>>>
> >>>>> If I set
> >>>>>
> >>>>> StreamExecutionEnvironment.setParallelism(1);
> >>>>>
> >>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
> >>>>> getCurrentWatermark() and emitting a watermark at every record)
> >>>>>
> >>>>> If I set
> >>>>>
> >>>>> StreamExecutionEnvironment.setParallelism(5);
> >>>>>
> >>>>> it does not work.
> >>>>>
> >>>>> So, if I understood you correctly, it is the opposite of what you were
> >>>>> expecting?!
> >>>>>
> >>>>> Cheers,
> >>>>>
> >>>>> Konstantin
> >>>>>
> >>>>>
> >>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
> >>>>>> Hi,
> >>>>>> actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor sometimes emits a watermark that is lower than the one before. (This is the result of the bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from all upstream operators and only advance the watermark monotonically in lockstep with them. This way, the watermark cannot decrease at an operator.
> >>>>>>
> >>>>>> In your case, you have a topology with parallelism 1, I assume. In that case the operators are chained. (There is no separate operators but basically only one operator and element transmission happens in function calls). In this setting the watermarks are directly forwarded to operators without going through the logic I mentioned above.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Aljoscha
> >>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <[hidden email]> wrote:
> >>>>>>>
> >>>>>>> Hi Aljoscha,
> >>>>>>>
> >>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
> >>>>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
> >>>>>>> the opposite than before (only watermarks per events vs only watermarks
> >>>>>>> per autowatermark). And now it works :). The question remains, why it
> >>>>>>> did not work before. As far as I see, it is an issue with the first
> >>>>>>> TimestmapExtractor itself?!
> >>>>>>>
> >>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>>
> >>>>>>> Konstantin
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>> final private long maxDelay;
> >>>>>>> private long lastTimestamp = Long.MIN_VALUE;
> >>>>>>>
> >>>>>>> public PojoTimestampExtractor(long maxDelay) {
> >>>>>>>     this.maxDelay = maxDelay;
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public long extractTimestamp(Pojo pojo, long l) {
> >>>>>>>     lastTimestamp = pojo.getTime();
> >>>>>>>     return pojo.getTime();
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public long extractWatermark(Pojo pojo, long l) {
> >>>>>>>     return Long.MIN_VALUE;
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public long getCurrentWatermark() {
> >>>>>>>     return lastTimestamp - maxDelay;
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
> >>>>>>>> Hi,
> >>>>>>>> yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark interval can be tunes using the auto-watermark interval setting.
> >>>>>>>>
> >>>>>>>> But that should not be the cause of the problem that you currently have. Would you maybe be willing to send me some (mock) example data and the code so that I can reproduce the problem and have a look at it? to aljoscha at apache.org.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Aljoscha
> >>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <[hidden email]> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Aljoscha,
> >>>>>>>>>
> >>>>>>>>> ok, now I at least understand, why it works with fromElements(...). For
> >>>>>>>>> the rest I am not so sure.
> >>>>>>>>>
> >>>>>>>>>> What this means in your case is that the watermark can only advance if
> >>>>>>>>> a new element arrives, because only then is the watermark updated.
> >>>>>>>>>
> >>>>>>>>> But new elements arrive all the time, about 50/s, or do you mean
> >>>>>>>>> something else?
> >>>>>>>>>
> >>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
> >>>>>>>>> choice, if i understand the semantics correctly. It just affects
> >>>>>>>>> watermarking in the absence of events, right?
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>>
> >>>>>>>>> Konstantin
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
> >>>>>>>>>> Hi,
> >>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally.
> >>>>>>>>>>
> >>>>>>>>>> First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
> >>>>>>>>>> - the result of extractTimestamp is taken and it replaces the internal timestamp of the element
> >>>>>>>>>> - if the result of extractWatermark is larger than the last watermark the new value is emitted as a watermark and the value is stored
> >>>>>>>>>> - getCurrentWatermark is called on the specified auto-watermark interval, if the returned value is larger than the last watermark it is emitted and stored as last watermark
> >>>>>>>>>>
> >>>>>>>>>> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated.
> >>>>>>>>>>
> >>>>>>>>>> The reason why you see results if you use fromElements is that the window-operator also emits all the windows that it currently has buffered if the program closes. This happens in the case of fromElements because only a finite number of elements is emitted, after which the source closes, thereby finishing the whole program.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Aljoscha
> >>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <[hidden email]> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Could this part of the extractor be the problem Aljoscha?
> >>>>>>>>>>>
> >>>>>>>>>>> @Override
> >>>>>>>>>>> public long getCurrentWatermark() {
> >>>>>>>>>>>    return Long.MIN_VALUE;
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> Gyula
> >>>>>>>>>>>
> >>>>>>>>>>> Konstantin Knauf <[hidden email]> ezt írta (időpont: 2015. nov. 16., H, 10:39):
> >>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>
> >>>>>>>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
> >>>>>>>>>>>
> >>>>>>>>>>> The timestamps look good to me. Here an example.
> >>>>>>>>>>>
> >>>>>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
> >>>>>>>>>>>
> >>>>>>>>>>> The order now is
> >>>>>>>>>>>
> >>>>>>>>>>> stream
> >>>>>>>>>>> .map(dummyMapper)
> >>>>>>>>>>> .assignTimestamps(...)
> >>>>>>>>>>> .timeWindow(...)
> >>>>>>>>>>>
> >>>>>>>>>>> Is there a way to print out the assigned timestamps after
> >>>>>>>>>>> stream.assignTimestamps(...)?
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>>
> >>>>>>>>>>> Konstantin
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
> >>>>>>>>>>>> Hi,
> >>>>>>>>>>>> are you also using the timestamp extractor when you are using env.fromCollection().
> >>>>>>>>>>>>
> >>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <[hidden email]> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
> >>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
> >>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
> >>>>>>>>>>>>> (parameterTool.getRequired("topic"),
> >>>>>>>>>>>>>           new AvroPojoDeserializationSchema(),
> >>>>>>>>>>>>> parameterTool.getProperties()))
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
> >>>>>>>>>>>>> AutoWatermarkIntervall is 500.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The problem is, when I do something like:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
> >>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
> >>>>>>>>>>>>> .sum(..)
> >>>>>>>>>>>>> .print()
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> env.execute();
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> the windows never get triggered.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If I use ProcessingTime it works.
> >>>>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
> >>>>>>>>>>>>> with EventTime, too.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Konstantin
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> final private long maxDelay;
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
> >>>>>>>>>>>>>   this.maxDelay = maxDelay;
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Override
> >>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
> >>>>>>>>>>>>>   return pojo.getTime();
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Override
> >>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
> >>>>>>>>>>>>>   return pojo.getTime() - maxDelay;
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Override
> >>>>>>>>>>>>> public long getCurrentWatermark() {
> >>>>>>>>>>>>>   return Long.MIN_VALUE;
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Konstantin Knauf * [hidden email] * <a href="tel:%2B49-174-3413182" value="+491743413182">+49-174-3413182
> >>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Konstantin Knauf * [hidden email] * <a href="tel:%2B49-174-3413182" value="+491743413182">+49-174-3413182
> >>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Konstantin Knauf * [hidden email] * <a href="tel:%2B49-174-3413182" value="+491743413182">+49-174-3413182
> >>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>>>
> >>>>>>
> >>>>>
> >>>>> --
> >>>>> Konstantin Knauf * [hidden email] * <a href="tel:%2B49-174-3413182" value="+491743413182">+49-174-3413182
> >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>
> >>>>
> >>>
> >>
> >> --
> >> Konstantin Knauf * [hidden email] * <a href="tel:%2B49-174-3413182" value="+491743413182">+49-174-3413182
> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >
>


12