Hey,
I'm using a BoundedOutOfOrdernessTimestampExtractor for assigning my timestamps and discarding to old events (which happens sometimes). Now my problem is that some events, by accident have timestamps in the future. If the timestamps are more in the future than my `maxOutOfOrderness`, I'm discarding valid events. So I need a way of saying that the BoundedOutOfOrdernessTimestampExtractor should exclude timestamps from the future for the watermark calculation. I still want to keep the events if they are in the future and assign them to the right watermarks. How can I achieve this? I thought about checking whether the potential timestamp is in the future before considering it for a watermark. I cloned the BoundedOutOfOrdernessTimestampExtractor and added the idea https://gist.github.com/theomega/090054032e0b3c3b9fb06767f0fec7e7 Does this make sense? Or is there a better approach? In general, how does Flink handle readings from the future? Thanks, Dominik -- Dominik |
Hi Dominik,
out of curiosity, how come that you receive timestamps from the future? ;) Depending on the semantics of these future events, it might also make sense to already "floor" the timestamp to processing time in the extractTimestamp()-Method. I am not sure, if I understand your follow up question correctly, but afaik Flink does not have a notion of future and past. Events just have just timestamps and the general assumption is that time runs forward (at least in the long run). "Future" events can potentially advance the curent watermark. So (event time) windows might be closed "too early" w.r.t. to the rest of the events (These events can be processed with "allowed lateness".) There are some sections in the documentation, which might help you (https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html). Depending on the particular problem, you might be able to develop a fancy watermarking mechanism, which mitigates the effect of these future timestamps. Does this answer your question in any way? :) Cheers, Konstantin On 01.11.2016 15:05, Dominik Bruhn wrote: > Hey, > I'm using a BoundedOutOfOrdernessTimestampExtractor for assigning my > timestamps and discarding to old events (which happens sometimes). > > Now my problem is that some events, by accident have timestamps in the > future. If the timestamps are more in the future than my > `maxOutOfOrderness`, I'm discarding valid events. So I need a way of > saying that the > BoundedOutOfOrdernessTimestampExtractor should exclude timestamps from > the future for the watermark calculation. I still want to keep the > events if they are in the future and assign them to the right watermarks. > > How can I achieve this? I thought about checking whether the potential > timestamp is in the future before considering it for a watermark. I > cloned the BoundedOutOfOrdernessTimestampExtractor and added the idea > https://gist.github.com/theomega/090054032e0b3c3b9fb06767f0fec7e7 > > Does this make sense? Or is there a better approach? > > In general, how does Flink handle readings from the future? > > Thanks, > Dominik > Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 signature.asc (836 bytes) Download Attachment |
The BoundedOutOfOrdernessTimestampExtractor is not really useful if
you have outliers because you always set the Watermark to the element with the largest timestamp minus the out-of-orderness. If your data is of such nature, you will have to implement a custom Watermark extractor to deal with these elements. -Max On Tue, Nov 1, 2016 at 10:02 PM, Konstantin Knauf <[hidden email]> wrote: > Hi Dominik, > > out of curiosity, how come that you receive timestamps from the future? ;) > > Depending on the semantics of these future events, it might also make > sense to already "floor" the timestamp to processing time in the > extractTimestamp()-Method. > > I am not sure, if I understand your follow up question correctly, but > afaik Flink does not have a notion of future and past. Events just have > just timestamps and the general assumption is that time runs forward (at > least in the long run). "Future" events can potentially advance the > curent watermark. So (event time) windows might be closed "too early" > w.r.t. to the rest of the events (These events can be processed with > "allowed lateness".) There are some sections in the documentation, which > might help you > (https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html). > Depending on the particular problem, you might be able to develop a > fancy watermarking mechanism, which mitigates the effect of these future > timestamps. Does this answer your question in any way? :) > > Cheers, > > Konstantin > > > On 01.11.2016 15:05, Dominik Bruhn wrote: >> Hey, >> I'm using a BoundedOutOfOrdernessTimestampExtractor for assigning my >> timestamps and discarding to old events (which happens sometimes). >> >> Now my problem is that some events, by accident have timestamps in the >> future. If the timestamps are more in the future than my >> `maxOutOfOrderness`, I'm discarding valid events. So I need a way of >> saying that the >> BoundedOutOfOrdernessTimestampExtractor should exclude timestamps from >> the future for the watermark calculation. I still want to keep the >> events if they are in the future and assign them to the right watermarks. >> >> How can I achieve this? I thought about checking whether the potential >> timestamp is in the future before considering it for a watermark. I >> cloned the BoundedOutOfOrdernessTimestampExtractor and added the idea >> https://gist.github.com/theomega/090054032e0b3c3b9fb06767f0fec7e7 >> >> Does this make sense? Or is there a better approach? >> >> In general, how does Flink handle readings from the future? >> >> Thanks, >> Dominik >> > > -- > Konstantin Knauf * [hidden email] * +49-174-3413182 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > |
Free forum by Nabble | Edit this page |