Hi all,
Based on the advice of Aljoscha in this ’m trying to implement a ProcessFunction that simulates the original sliding window (using Flink 1.2.1, still). My current setup is as follows for a window that is windowWidth wide and slides every windowSlide: - Keep a ListState<Tuple2<Long, InputType>> - processElement adds the incoming element together with context.timestamp() to the list state - If no time was started (a flag in the function instance), registerEventTimeTimer(context.timestamp() + windowWidth); set flag to true: if (!timerStarted) { ctx.timerService().registerEventTimeTimer(timestamp + windowWidthMs); // timestamp = context.timestamp() timerStarted = true; } - onTimer gets the list, restricts it to all elements that are currentTime – windowWidth in the past, and passes them on to a wrapped function that does my ‘domain logic’. The result of that function is passed on to the collector. After that register a new timer at currentTime + windowSlide. I’m now testing this function using a LocalEnvironment and a stream (fromCollection) of increasing longs, with the timestamp being the value of the long. The toplogy is: streamOfLongs .map(l -> Long.toString(l)) // Needed because myProcessFunction operates on Strings. It also triggers parallel operation. .keyBy(x -> 0) .process(myProcessFunction) What I’m now running into is that the first element passed to myProcessFunction is not always the first element in the stream (unless I set the parallelism of the operators to 1), but I need the timestamp of the first element to start off my timer chain. Is there a way around this? The only solution right now is changing the onTimer call to extract all windows (from the beginning) of the state and then clean up the ListState to remove elements that are no longer part of any future window (based on the watermark). However, this feels a bit clunky, and might still lead to duplicate ‘windows’ without some extra bookkeeping. BTW, it seems that the example of ProcessingFunction in the docks (of Flink 1.3.0) also has this problem, since it sets the state to refer to the last processed element, but if elements are processed in parallel, they do not arrive in order, so the last processed element might not be the most recent element. Thanks, Carst |
Hi,
To answer your first question, yes, elements (can) arrive out-of-order and in most real-world use cases they will. Making them arrive in order can be prohibitively expensive because you have to buffer elements and then sort them when a watermark arrives. It’s possible to do this in custom user code but Flink will probably not provide such functionality in the near future. I think you could just set a timer for “timestamp - (timestamp % slide-size) + slide-size”, this would ensure that you always get a timer on the slide boundaries. This works well because Flink will de-duplicate timers, that is when you repeatedly set a timer for timestamp t you will in the end only have one timer for timestamp t. Regarding your implementation, doing the decision based on an instance field can be problematic because the user function is re-used for processing elements of different keys. State and timers are scoped to the key you specified, so it will happen that you only set a timer for one of your keys and you never process the data that you stored for other keys. As I mentioned above, you can simply always set a timer that is clamped to modulo-slide-size boundaries. Best, Aljoscha > On 2. Jun 2017, at 11:26, Carst Tankink <[hidden email]> wrote: > > Hi all, > > Based on the advice of Aljoscha in this ’m trying to implement a ProcessFunction that simulates the original sliding window (using Flink 1.2.1, still). > My current setup is as follows for a window that is windowWidth wide and slides every windowSlide: > > - Keep a ListState<Tuple2<Long, InputType>> > - processElement adds the incoming element together with context.timestamp() to the list state > - If no time was started (a flag in the function instance), registerEventTimeTimer(context.timestamp() + windowWidth); set flag to true: > if (!timerStarted) { > ctx.timerService().registerEventTimeTimer(timestamp + windowWidthMs); // timestamp = context.timestamp() > timerStarted = true; > } > - onTimer gets the list, restricts it to all elements that are currentTime – windowWidth in the past, and passes them on to a wrapped function that does my ‘domain logic’. The result of that function is passed on to the collector. After that register a new timer at currentTime + windowSlide. > > I’m now testing this function using a LocalEnvironment and a stream (fromCollection) of increasing longs, with the timestamp being the value of the long. The toplogy is: > streamOfLongs > .map(l -> Long.toString(l)) // Needed because myProcessFunction operates on Strings. It also triggers parallel operation. > .keyBy(x -> 0) > .process(myProcessFunction) > > What I’m now running into is that the first element passed to myProcessFunction is not always the first element in the stream (unless I set the parallelism of the operators to 1), but I need the timestamp of the first element to start off my timer chain. > Is there a way around this? The only solution right now is changing the onTimer call to extract all windows (from the beginning) of the state and then clean up the ListState to remove elements that are no longer part of any future window (based on the watermark). However, this feels a bit clunky, and might still lead to duplicate ‘windows’ without some extra bookkeeping. > > BTW, it seems that the example of ProcessingFunction in the docks (of Flink 1.3.0) also has this problem, since it sets the state to refer to the last processed element, but if elements are processed in parallel, they do not arrive in order, so the last processed element might not be the most recent element. > > > Thanks, > Carst > > > |
Free forum by Nabble | Edit this page |