BoundedOutOfOrdernessTimestampExtractor and timestamps in the future

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

BoundedOutOfOrdernessTimestampExtractor and timestamps in the future

Dominik Bruhn
Hey,
I'm using a BoundedOutOfOrdernessTimestampExtractor for assigning my
timestamps and discarding to old events (which happens sometimes).

Now my problem is that some events, by accident have timestamps in the
future. If the timestamps are more in the future than my
`maxOutOfOrderness`, I'm discarding valid events. So I need a way of
saying that the
BoundedOutOfOrdernessTimestampExtractor should exclude timestamps from
the future for the watermark calculation. I still want to keep the
events if they are in the future and assign them to the right
watermarks.

How can I achieve this? I thought about checking whether the potential
timestamp is in the future before considering it for a watermark. I
cloned the BoundedOutOfOrdernessTimestampExtractor and added the idea
https://gist.github.com/theomega/090054032e0b3c3b9fb06767f0fec7e7

Does this make sense? Or is there a better approach?

In general, how does Flink handle readings from the future?

Thanks,
Dominik

--
Dominik
Reply | Threaded
Open this post in threaded view
|

Re: BoundedOutOfOrdernessTimestampExtractor and timestamps in the future

snntr
Hi Dominik,

out of curiosity, how come that you receive timestamps from the future? ;)

Depending on the semantics of these future events, it might also make
sense to already "floor" the timestamp to processing time in the
extractTimestamp()-Method.

I am not sure, if I understand your follow up question correctly, but
afaik Flink does not have a notion of future and past. Events just have
just timestamps and the general assumption is that time runs forward (at
least in the long run). "Future" events can potentially advance the
curent watermark. So (event time) windows might be closed "too early"
w.r.t. to the rest of the events (These events can be processed with
"allowed lateness".) There are some sections in the documentation, which
might help you
(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html).
Depending on the particular problem, you might be able to develop a
fancy watermarking mechanism, which mitigates the effect of these future
timestamps. Does this answer your question in any way? :)

Cheers,

Konstantin


On 01.11.2016 15:05, Dominik Bruhn wrote:

> Hey,
> I'm using a BoundedOutOfOrdernessTimestampExtractor for assigning my
> timestamps and discarding to old events (which happens sometimes).
>
> Now my problem is that some events, by accident have timestamps in the
> future. If the timestamps are more in the future than my
> `maxOutOfOrderness`, I'm discarding valid events. So I need a way of
> saying that the
> BoundedOutOfOrdernessTimestampExtractor should exclude timestamps from
> the future for the watermark calculation. I still want to keep the
> events if they are in the future and assign them to the right watermarks.
>
> How can I achieve this? I thought about checking whether the potential
> timestamp is in the future before considering it for a watermark. I
> cloned the BoundedOutOfOrdernessTimestampExtractor and added the idea
> https://gist.github.com/theomega/090054032e0b3c3b9fb06767f0fec7e7
>
> Does this make sense? Or is there a better approach?
>
> In general, how does Flink handle readings from the future?
>
> Thanks,
> Dominik
>
--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: BoundedOutOfOrdernessTimestampExtractor and timestamps in the future

Maximilian Michels
The BoundedOutOfOrdernessTimestampExtractor is not really useful if
you have outliers because you always set the Watermark to the element
with the largest timestamp minus the out-of-orderness. If your data is
of such nature, you will have to implement a custom Watermark
extractor to deal with these elements.

-Max


On Tue, Nov 1, 2016 at 10:02 PM, Konstantin Knauf
<[hidden email]> wrote:

> Hi Dominik,
>
> out of curiosity, how come that you receive timestamps from the future? ;)
>
> Depending on the semantics of these future events, it might also make
> sense to already "floor" the timestamp to processing time in the
> extractTimestamp()-Method.
>
> I am not sure, if I understand your follow up question correctly, but
> afaik Flink does not have a notion of future and past. Events just have
> just timestamps and the general assumption is that time runs forward (at
> least in the long run). "Future" events can potentially advance the
> curent watermark. So (event time) windows might be closed "too early"
> w.r.t. to the rest of the events (These events can be processed with
> "allowed lateness".) There are some sections in the documentation, which
> might help you
> (https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html).
> Depending on the particular problem, you might be able to develop a
> fancy watermarking mechanism, which mitigates the effect of these future
> timestamps. Does this answer your question in any way? :)
>
> Cheers,
>
> Konstantin
>
>
> On 01.11.2016 15:05, Dominik Bruhn wrote:
>> Hey,
>> I'm using a BoundedOutOfOrdernessTimestampExtractor for assigning my
>> timestamps and discarding to old events (which happens sometimes).
>>
>> Now my problem is that some events, by accident have timestamps in the
>> future. If the timestamps are more in the future than my
>> `maxOutOfOrderness`, I'm discarding valid events. So I need a way of
>> saying that the
>> BoundedOutOfOrdernessTimestampExtractor should exclude timestamps from
>> the future for the watermark calculation. I still want to keep the
>> events if they are in the future and assign them to the right watermarks.
>>
>> How can I achieve this? I thought about checking whether the potential
>> timestamp is in the future before considering it for a watermark. I
>> cloned the BoundedOutOfOrdernessTimestampExtractor and added the idea
>> https://gist.github.com/theomega/090054032e0b3c3b9fb06767f0fec7e7
>>
>> Does this make sense? Or is there a better approach?
>>
>> In general, how does Flink handle readings from the future?
>>
>> Thanks,
>> Dominik
>>
>
> --
> Konstantin Knauf * [hidden email] * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>