I want to augment a POJO in Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence
1. call to add() in the accumulator for the window and save the POJO reference in the Accumulator. 2. call to onElement on Tigger 3. set watermark to the POJO The next add() method should have the last reference and any mutation done in step 3. That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(), but not on a distributed cluster. The specific question I had is whether add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) Regards. |
An addendum Is the element reference IN in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects. The pipeline .keyBy(keySelector) /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
The Trigger public class CountBasedWMAugmentationTrigger<T extends On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
|
Hi Vishal, the Trigger is not designed to augment records but just to control when a window is evaluated. The context object of the processElement() method gives access to the current watermark and timestamp of a record. 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
|
Hello Fabian, Thank you for the response. I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. The best API option I think is a TimeBaseTrigger that fires every configured time progression of WM and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow us handle the "only data up till that WM" range retrieval using some custom data structure. Regards. On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
|
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the trim up to watermark behavior ( reduce call after telling it the current WM ) we desire public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> { On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
|
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for patterns, sorted on even time, in the incoming sessionized ( and thus of un deterministic length ) stream.... On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
|
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362 is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. I know I am simplifying this and there has to be more to it... On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
|
Hi Vishal, it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function. 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
|
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ? Trigger Context does expose another version that has removal abilities so was wondering why this dissonance... On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
|
And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc... On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <[hidden email]> wrote:
|
That's correct. Removal of timers is not supported in ProcessFunction. Not sure why this is supported for Triggers. The common workaround for ProcessFunctions is to register multiple timers and have a ValueState that stores the valid timestamp on which the onTimer method should be executed.If you want to fire on the next watermark, another trick is to register multiple timers on (currentWatermark + 1). Since there is only one timer per timestamp, there is only one timer which gets continuously overwritten. The timer is called when the watermark is advanced. 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <[hidden email]>:
|
Thanks. I have a few follow up questions regarding ProcessFunction. I think that the core should take care of any synchronization issues between calls to onElement and onTimer in case of a keyed stream but tests do not seem to suggest that. I have specifically 2 questions. 1. Are calls to onElement(..) single threaded if scoped to a key ? As in on a keyed stream, is there a way that 2 or more threads can execute on the more than one element of a single key at one time ? Would I have to synchronize this construction OUT accumulator = accumulatorState.value(); 2. Can concurrent calls happen onTimer(..) and onElement(..) for the same key ? I intend to clean up state but I see NullPointers in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are executed on 2 separate threads, with on Timer removing the state ( clear() ) but after another thread has registered a Timer ( in onElement ). if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions PS. This is the full code. @Override On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <[hidden email]> wrote:
|
Hi, all calls to onElement() or onTimer() are syncronized for any keys. Think of a single thread calling these methods.The NPE might be thrown because of two timers that fire one after the other without a new record being processed in between the onTimer() calls. In that case the state is cleared in the first call and null in the second. 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <[hidden email]>:
|
This makes sense. Thanks. On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <[hidden email]> wrote:
|
Dear Fabian, Sessionization is unique as in it entails windows of dynamic length. The way flink approaches is pretty simple. It will create a TimeWindow of size "gap" relative to the event time, find an overlapping window ( intersection ) and create a covering window. Each such window has a "state" associated with it, which too has to be merged when a cover window is created on intersection of 2 or more incident windows.To be more precise if Window1 spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and t1<=t3<=t2 a new Window is created ( t1, t4 ) and the associated states are merged. In the current Window API the states are external and are Accumulator based. This approach pretty much works for all cases where the aggregation is accumulative/reduced and does not depend on order, as in no order list of incoming records needs to be kept and reduction is to a single aggregated element ( think counts, min max etc). In path analysis ( and other use cases ) however this approach has drawbacks. Even though in our accumulator we could keep an ordered list of events it becomes unreasonable if not within bounds. An approach that does attempt to bind state, is to preemptively analyze paths using the WM as the marker that defines the subset of the state that is safe to analyze. So if we have n events in the window state and m fall before WM, we can safely analyze the m subset, emitting paths seen and reducing the cumulative state size. There are caveats though that I will go into later. Unfortunately the Accumulators in Flink Window runtime defaults do not have access to the WM. This lead to this generic approach ( implemented and tested ) * Use a low level ProcessFunction that allows access to WM and definitely nearer to the guts of Flink. * Still use the merge Windows on intersection approach but use WM to trigger ( through Timers) reductions in state. This is not very dissimilar to what Flink does but we have more control over what to do and when to do it. Essentially have exposed a lifecycle method that reacts to WM progression. * There are essentially 2 Timers. The first timer is the maxTimeStamp() of a Window, which if there is no further mutation b'coz of merge etc will fire to reflect a Session End. The second one is on currentWaterMark+1 that essentially calls a "reduceToWM" on each keyed Window and thus State. * There are 2 ways to short circuit a Session 1. On Session time span 2. On Session size. * There is a safety valve to blacklist keys when it is obvious that it is a bot ( again The solution will thus preemptively push out Patterns ( and correct patterns ) while keeping the ordered state within reasonable bounds. The incident data of course has to be analyzed . Are the paths to large etc. But one has full control over how to fashion the solution. Regards and Thanks, Vishal On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <[hidden email]> wrote:
|
Hi Vishal, thanks for sharing your solution!In that case you would simply need a custom trigger that calls the ProcessWindowFunction when a new watermark arrives. For intermediate calls, you just FIRE and for the final call you FIRE_AND_PURGE to remove the elements from the window's state. Did you try that? Best, Fabian 2018-01-03 15:57 GMT+01:00 Vishal Santoshi <[hidden email]>:
|
Hello Fabian, Thank you for your response. What is required is that on a new watermark * We sort these Session objects * Get the subset that are before the new Watermark and an emit without purge. I do not see how the Trigger approach helps us. It does tell us that the watermark has progressed but to get a subset of the ListState that falls before the watermark, we would need access to the new value of the watermark. That was what my initial query was. public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, TimeWindow> { On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <[hidden email]> wrote:
|
Hi, you would not need the ListStateDescriptor. A WindowProcessFunction stores all events that are assigned to a window (IN objects in your case) in an internal ListState. The Iterable<IN> parameter of the process() method iterates over the internal list state. 2018-01-05 17:28 GMT+01:00 Vishal Santoshi <[hidden email]>:
|
Yep, this though is suboptimal as you imagined. Two things * <IN> has a internally has a <INLite> that is a ultra lite version of IN, only required for the path analysis. * Sessionization being expensive, we piggy back multiple other aggregations that do not depend on the path or order ( count etc ) . Essentially Session is (order path + accumulated stats). The code seems pretty all right and please tell me if you need a see it. All generics so no secrets here. On Fri, Jan 5, 2018 at 11:58 AM, Fabian Hueske <[hidden email]> wrote:
|
I think I got it Glad you solved this tricky issue and thanks for sharing your solution :-)2018-01-06 14:33 GMT+01:00 Vishal Santoshi <[hidden email]>:
|
Free forum by Nabble | Edit this page |