Hej,
I have a stream of timestamped events I want to process in Flink streaming. Di I have to write my own policies to do so, or can define time based windows to use the timestamps instead of the system time? cheers Martin |
Hi Martin,
you need to implement you own policy. However, this should be be complicated. Have a look at "TimeTriggerPolicy". You just need to provide a "Timestamp" implementation that extracts you ts-attribute from the tuples. -Matthias On 08/28/2015 03:58 PM, Martin Neumann wrote: > Hej, > > I have a stream of timestamped events I want to process in Flink streaming. > Di I have to write my own policies to do so, or can define time based > windows to use the timestamps instead of the system time? > > cheers Martin signature.asc (836 bytes) Download Attachment |
Hi Martin, the answer depends, because the current windowing implementation has some problems. We are working on improving it in the 0.10 release, though. If your elements arrive with strictly increasing timestamps and you have parallelism=1 or don't perform any re-partitioning of data (which a groupBy() does, for example) then what Matthias proposed works for you. If not then you can get intro problems with out-of-order elements and windows will be incorrectly determined. If you are interested in what we are working on for 0.10, please look at the design documents here https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams and here https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams. The basic idea is to make windows work correctly when elements arrive not ordered by timestamps. For this we want use watermarks as popularized, for example, by Google Dataflow. Please ask if you have questions about this or are interested in joining the discussion (the design as not yet finalized, both API and implementation). :D Cheers, Aljoscha P.S. I have some proof-of-concept work in a branch of mine, if you interested in my work there I could give you access to it. On Fri, 28 Aug 2015 at 16:11 Matthias J. Sax <[hidden email]> wrote: Hi Martin, |
The stream consists of logs from different machines with synchronized clocks. As a result timestamps are not strictly increasing but there is a bound on how much out of order they can be. (One aim is to detect events go out of order more then a certain amount indication some problem in the system setup) I will look at the example policies and see if I can find a way to make it work with 0.9. I am aware of Google Dataflow and the discussion on Flink, though I just recently learned more about the field, so I didn't have to much useful to say. This might change if I get some more experience with the usecase I'm working on. cheers Martin On Fri, Aug 28, 2015 at 5:06 PM, Aljoscha Krettek <[hidden email]> wrote:
|
Hej, I want to give TimeTriggerPolicy a try and see how much of a problem it will be in this use case. Is there any example on how to use it? I looked at the API descriptions but I'm confused now. cheers Martin On Fri, Aug 28, 2015 at 5:35 PM, Martin Neumann <[hidden email]> wrote:
|
This is actually simpler than you think, you can just use the Time.of(...) helper:
ds.window(Time.of(long windowSize, Timestamp yourTimeStampExtractor, long startTime))... Gyula Martin Neumann <[hidden email]> ezt írta (időpont: 2015. szept. 8., K, 20:20):
|
Free forum by Nabble | Edit this page |