A question about Triggers

classic Classic list List threaded Threaded
20 messages Options
Reply | Threaded
Open this post in threaded view
|

A question about Triggers

Vishal Santoshi
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.
Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Vishal Santoshi
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.

Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Fabian Hueske-2
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.


Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Vishal Santoshi
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.



Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Vishal Santoshi
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.




Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Vishal Santoshi
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.





Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Vishal Santoshi
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.






Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Fabian Hueske-2
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.







Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Vishal Santoshi
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ?  Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.








Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Vishal Santoshi
And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <[hidden email]> wrote:
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ?  Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.









Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Fabian Hueske-2
That's correct. Removal of timers is not supported in ProcessFunction. Not sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers and have a ValueState that stores the valid timestamp on which the onTimer method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register multiple timers on (currentWatermark + 1). Since there is only one timer per timestamp, there is only one timer which gets continuously overwritten. The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi <[hidden email]>:
And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <[hidden email]> wrote:
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ?  Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.










Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Vishal Santoshi

Thanks. 




    I have a few follow up questions regarding ProcessFunction. I think that the core should take care of any synchronization issues between calls to onElement and onTimer in case of a keyed stream but tests do not seem to suggest that.



I have  specifically 2 questions.


1.  Are calls  to onElement(..) single threaded if scoped to a key ? As in on a keyed stream, is there a  way that 2 or more threads can execute on the more than one element of a single key at one time ? Would I have to synchronize this construction     

OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}



2. Can concurrent calls happen  onTimer(..) and onElement(..) for the same key ? I intend to clean up state but I see  NullPointers in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are executed on 2  separate threads, with on Timer removing the state ( clear() ) but after another thread has registered a Timer ( in onElement ).


if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}






PS. This is the full code.



@Override
public void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}
accumulator.setLastModified(context.timestamp());
accumulatorState.update(accumulator);
timerService.registerEventTimeTimer(context.timestamp() + gap);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
OUT accumulator = accumulatorState.value();
if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}
}



On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <[hidden email]> wrote:
That's correct. Removal of timers is not supported in ProcessFunction. Not sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers and have a ValueState that stores the valid timestamp on which the onTimer method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register multiple timers on (currentWatermark + 1). Since there is only one timer per timestamp, there is only one timer which gets continuously overwritten. The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi <[hidden email]>:
And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <[hidden email]> wrote:
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ?  Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.











Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Fabian Hueske-2
Hi,

all calls to onElement() or onTimer() are syncronized for any keys. Think of a single thread calling these methods.
Event-time timers are called when a watermark passes the timer. Watermarks are received as special records, so the methods are called in the same order as records (actual records or watermarks) arrive at the function. Only for processing-time timers, actual synchronization is required.

The NPE might be thrown because of two timers that fire one after the other without a new record being processed in between the onTimer() calls. In that case the state is cleared in the first call and null in the second.

Best, Fabian

2017-12-23 16:36 GMT+01:00 Vishal Santoshi <[hidden email]>:

Thanks. 




    I have a few follow up questions regarding ProcessFunction. I think that the core should take care of any synchronization issues between calls to onElement and onTimer in case of a keyed stream but tests do not seem to suggest that.



I have  specifically 2 questions.


1.  Are calls  to onElement(..) single threaded if scoped to a key ? As in on a keyed stream, is there a  way that 2 or more threads can execute on the more than one element of a single key at one time ? Would I have to synchronize this construction     

OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}



2. Can concurrent calls happen  onTimer(..) and onElement(..) for the same key ? I intend to clean up state but I see  NullPointers in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are executed on 2  separate threads, with on Timer removing the state ( clear() ) but after another thread has registered a Timer ( in onElement ).


if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}






PS. This is the full code.



@Override
public void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}
accumulator.setLastModified(context.timestamp());
accumulatorState.update(accumulator);
timerService.registerEventTimeTimer(context.timestamp() + gap);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
OUT accumulator = accumulatorState.value();
if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}
}



On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <[hidden email]> wrote:
That's correct. Removal of timers is not supported in ProcessFunction. Not sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers and have a ValueState that stores the valid timestamp on which the onTimer method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register multiple timers on (currentWatermark + 1). Since there is only one timer per timestamp, there is only one timer which gets continuously overwritten. The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi <[hidden email]>:
And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <[hidden email]> wrote:
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ?  Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.












Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Vishal Santoshi
This makes sense.  Thanks.

On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

all calls to onElement() or onTimer() are syncronized for any keys. Think of a single thread calling these methods.
Event-time timers are called when a watermark passes the timer. Watermarks are received as special records, so the methods are called in the same order as records (actual records or watermarks) arrive at the function. Only for processing-time timers, actual synchronization is required.

The NPE might be thrown because of two timers that fire one after the other without a new record being processed in between the onTimer() calls. In that case the state is cleared in the first call and null in the second.

Best, Fabian

2017-12-23 16:36 GMT+01:00 Vishal Santoshi <[hidden email]>:

Thanks. 




    I have a few follow up questions regarding ProcessFunction. I think that the core should take care of any synchronization issues between calls to onElement and onTimer in case of a keyed stream but tests do not seem to suggest that.



I have  specifically 2 questions.


1.  Are calls  to onElement(..) single threaded if scoped to a key ? As in on a keyed stream, is there a  way that 2 or more threads can execute on the more than one element of a single key at one time ? Would I have to synchronize this construction     

OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}



2. Can concurrent calls happen  onTimer(..) and onElement(..) for the same key ? I intend to clean up state but I see  NullPointers in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are executed on 2  separate threads, with on Timer removing the state ( clear() ) but after another thread has registered a Timer ( in onElement ).


if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}






PS. This is the full code.



@Override
public void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}
accumulator.setLastModified(context.timestamp());
accumulatorState.update(accumulator);
timerService.registerEventTimeTimer(context.timestamp() + gap);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
OUT accumulator = accumulatorState.value();
if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}
}



On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <[hidden email]> wrote:
That's correct. Removal of timers is not supported in ProcessFunction. Not sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers and have a ValueState that stores the valid timestamp on which the onTimer method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register multiple timers on (currentWatermark + 1). Since there is only one timer per timestamp, there is only one timer which gets continuously overwritten. The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi <[hidden email]>:
And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <[hidden email]> wrote:
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ?  Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.













Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Vishal Santoshi
Dear Fabian,

           I was able to create a pretty functional ProcessFunction and here is the synopsis and please see if it makes sense.

Sessionization is unique as in it entails windows of dynamic length. The way flink approaches is pretty simple. It will create a TimeWindow of size "gap" relative to the event time, find an overlapping window ( intersection ) and create a covering window. Each such window has a "state" associated with it, which too has to be merged when a cover window is created on intersection of 2 or more incident windows.To be more precise if Window1 spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2 a new Window is created ( t1, t4 ) and the associated states are merged. 


In the current Window API the states are external and are Accumulator based. This approach pretty much works for all cases where the aggregation is accumulative/reduced  and does not depend on order, as in no order list of incoming records needs to be kept and reduction is to a single aggregated element ( think counts, min max etc). In path analysis ( and other use cases ) however this approach has drawbacks. Even though in our accumulator we could keep an ordered list of events it becomes unreasonable if not within bounds. An approach that does attempt to bind state, is to preemptively analyze paths using the WM as the marker that defines the subset of the state that is safe to analyze. So if we have n events in the window state and m fall before WM, we can safely analyze the m subset, emitting paths seen and reducing the cumulative state size. There are caveats though that I will go into later. 


Unfortunately the Accumulators in Flink Window runtime defaults do not have access to the WM. 


This lead to this generic approach  ( implemented and tested ) 


* Use a low level ProcessFunction that allows access to WM and definitely nearer to the guts of Flink.


* Still use the merge Windows on intersection approach but use WM to trigger ( through Timers)  reductions in state. This is not very dissimilar to what Flink does but we have more control over what to do and when to do it. Essentially have exposed a lifecycle method that reacts to WM progression.  


* There are essentially 2 Timers. The first timer is the maxTimeStamp() of a Window, which if there is no further mutation b'coz of merge etc will fire to reflect a Session End. The second one is  on currentWaterMark+1 that essentially calls a "reduceToWM" on each keyed Window and thus State.


* There are 2 ways to short circuit a Session 1. On Session time span 2. On Session size. 


* There is a safety valve to blacklist keys when it is obvious that it is a bot ( again 


The solution will thus preemptively push out Patterns ( and correct patterns ) while keeping the ordered state within reasonable bounds. The incident data of course has to be analyzed . Are the paths to large etc. But one has full control over how to fashion the solution.




Regards and Thanks,


Vishal









On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <[hidden email]> wrote:
This makes sense.  Thanks.

On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

all calls to onElement() or onTimer() are syncronized for any keys. Think of a single thread calling these methods.
Event-time timers are called when a watermark passes the timer. Watermarks are received as special records, so the methods are called in the same order as records (actual records or watermarks) arrive at the function. Only for processing-time timers, actual synchronization is required.

The NPE might be thrown because of two timers that fire one after the other without a new record being processed in between the onTimer() calls. In that case the state is cleared in the first call and null in the second.

Best, Fabian

2017-12-23 16:36 GMT+01:00 Vishal Santoshi <[hidden email]>:

Thanks. 




    I have a few follow up questions regarding ProcessFunction. I think that the core should take care of any synchronization issues between calls to onElement and onTimer in case of a keyed stream but tests do not seem to suggest that.



I have  specifically 2 questions.


1.  Are calls  to onElement(..) single threaded if scoped to a key ? As in on a keyed stream, is there a  way that 2 or more threads can execute on the more than one element of a single key at one time ? Would I have to synchronize this construction     

OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}



2. Can concurrent calls happen  onTimer(..) and onElement(..) for the same key ? I intend to clean up state but I see  NullPointers in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are executed on 2  separate threads, with on Timer removing the state ( clear() ) but after another thread has registered a Timer ( in onElement ).


if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}






PS. This is the full code.



@Override
public void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}
accumulator.setLastModified(context.timestamp());
accumulatorState.update(accumulator);
timerService.registerEventTimeTimer(context.timestamp() + gap);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
OUT accumulator = accumulatorState.value();
if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}
}



On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <[hidden email]> wrote:
That's correct. Removal of timers is not supported in ProcessFunction. Not sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers and have a ValueState that stores the valid timestamp on which the onTimer method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register multiple timers on (currentWatermark + 1). Since there is only one timer per timestamp, there is only one timer which gets continuously overwritten. The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi <[hidden email]>:
And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <[hidden email]> wrote:
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ?  Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.














Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Fabian Hueske-2
Hi Vishal,

thanks for sharing your solution!

Looking at this issue again and your mail in which you shared your SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the ValueState that prevents the ProcessWindowFunction to be used in a mergeable window.
You could have created a new Session object in each invocation of the ProcessWindowFucntion and simply keep the elements in the (mergable) list state of the window.
In that case you would simply need a custom trigger that calls the ProcessWindowFunction when a new watermark arrives. For intermediate calls, you just FIRE and for the final call you FIRE_AND_PURGE to remove the elements from the window's state.
Did you try that?

Best, Fabian



2018-01-03 15:57 GMT+01:00 Vishal Santoshi <[hidden email]>:
Dear Fabian,

           I was able to create a pretty functional ProcessFunction and here is the synopsis and please see if it makes sense.

Sessionization is unique as in it entails windows of dynamic length. The way flink approaches is pretty simple. It will create a TimeWindow of size "gap" relative to the event time, find an overlapping window ( intersection ) and create a covering window. Each such window has a "state" associated with it, which too has to be merged when a cover window is created on intersection of 2 or more incident windows.To be more precise if Window1 spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2 a new Window is created ( t1, t4 ) and the associated states are merged. 


In the current Window API the states are external and are Accumulator based. This approach pretty much works for all cases where the aggregation is accumulative/reduced  and does not depend on order, as in no order list of incoming records needs to be kept and reduction is to a single aggregated element ( think counts, min max etc). In path analysis ( and other use cases ) however this approach has drawbacks. Even though in our accumulator we could keep an ordered list of events it becomes unreasonable if not within bounds. An approach that does attempt to bind state, is to preemptively analyze paths using the WM as the marker that defines the subset of the state that is safe to analyze. So if we have n events in the window state and m fall before WM, we can safely analyze the m subset, emitting paths seen and reducing the cumulative state size. There are caveats though that I will go into later. 


Unfortunately the Accumulators in Flink Window runtime defaults do not have access to the WM. 


This lead to this generic approach  ( implemented and tested ) 


* Use a low level ProcessFunction that allows access to WM and definitely nearer to the guts of Flink.


* Still use the merge Windows on intersection approach but use WM to trigger ( through Timers)  reductions in state. This is not very dissimilar to what Flink does but we have more control over what to do and when to do it. Essentially have exposed a lifecycle method that reacts to WM progression.  


* There are essentially 2 Timers. The first timer is the maxTimeStamp() of a Window, which if there is no further mutation b'coz of merge etc will fire to reflect a Session End. The second one is  on currentWaterMark+1 that essentially calls a "reduceToWM" on each keyed Window and thus State.


* There are 2 ways to short circuit a Session 1. On Session time span 2. On Session size. 


* There is a safety valve to blacklist keys when it is obvious that it is a bot ( again 


The solution will thus preemptively push out Patterns ( and correct patterns ) while keeping the ordered state within reasonable bounds. The incident data of course has to be analyzed . Are the paths to large etc. But one has full control over how to fashion the solution.




Regards and Thanks,


Vishal









On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <[hidden email]> wrote:
This makes sense.  Thanks.

On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

all calls to onElement() or onTimer() are syncronized for any keys. Think of a single thread calling these methods.
Event-time timers are called when a watermark passes the timer. Watermarks are received as special records, so the methods are called in the same order as records (actual records or watermarks) arrive at the function. Only for processing-time timers, actual synchronization is required.

The NPE might be thrown because of two timers that fire one after the other without a new record being processed in between the onTimer() calls. In that case the state is cleared in the first call and null in the second.

Best, Fabian

2017-12-23 16:36 GMT+01:00 Vishal Santoshi <[hidden email]>:

Thanks. 




    I have a few follow up questions regarding ProcessFunction. I think that the core should take care of any synchronization issues between calls to onElement and onTimer in case of a keyed stream but tests do not seem to suggest that.



I have  specifically 2 questions.


1.  Are calls  to onElement(..) single threaded if scoped to a key ? As in on a keyed stream, is there a  way that 2 or more threads can execute on the more than one element of a single key at one time ? Would I have to synchronize this construction     

OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}



2. Can concurrent calls happen  onTimer(..) and onElement(..) for the same key ? I intend to clean up state but I see  NullPointers in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are executed on 2  separate threads, with on Timer removing the state ( clear() ) but after another thread has registered a Timer ( in onElement ).


if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}






PS. This is the full code.



@Override
public void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}
accumulator.setLastModified(context.timestamp());
accumulatorState.update(accumulator);
timerService.registerEventTimeTimer(context.timestamp() + gap);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
OUT accumulator = accumulatorState.value();
if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}
}



On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <[hidden email]> wrote:
That's correct. Removal of timers is not supported in ProcessFunction. Not sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers and have a ValueState that stores the valid timestamp on which the onTimer method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register multiple timers on (currentWatermark + 1). Since there is only one timer per timestamp, there is only one timer which gets continuously overwritten. The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi <[hidden email]>:
And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <[hidden email]> wrote:
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ?  Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.















Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Vishal Santoshi
Hello Fabian, Thank you for your response. 

                     I thought about it and may be am missing something obvious here. The code below is what I think you suggest. The issue is that the window now is a list of Session's ( or shall subsets of the Session). 

What is required is that on a new watermark

* We sort these Session objects
* Get the subset that are before the new Watermark and an emit without purge.

I do not see how the Trigger approach helps us. It does tell us that the watermark has progressed but to get a subset of the ListState that falls before the watermark, we would need access to the new value  of the watermark. That was what my initial query was. 



public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, TimeWindow> {


OUT toCreateNew;
Long gap;
private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;

public SessionProcessWindow(TypeInformation<OUT> aggregationResultType,
OUT toCreateNew) {
this.toCreateNew = toCreateNew;
mergingSetsStateDescriptor =
new ListStateDescriptor<>("sessions", aggregationResultType);
}
@Override
public void process(String s, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception {
OUT session = toCreateNew.createNew();
elements.forEach(f -> session.add(f));
context.windowState().getListState(mergingSetsStateDescriptor).add(session);
}
}

On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

thanks for sharing your solution!

Looking at this issue again and your mail in which you shared your SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the ValueState that prevents the ProcessWindowFunction to be used in a mergeable window.
You could have created a new Session object in each invocation of the ProcessWindowFucntion and simply keep the elements in the (mergable) list state of the window.
In that case you would simply need a custom trigger that calls the ProcessWindowFunction when a new watermark arrives. For intermediate calls, you just FIRE and for the final call you FIRE_AND_PURGE to remove the elements from the window's state.
Did you try that?

Best, Fabian



2018-01-03 15:57 GMT+01:00 Vishal Santoshi <[hidden email]>:
Dear Fabian,

           I was able to create a pretty functional ProcessFunction and here is the synopsis and please see if it makes sense.

Sessionization is unique as in it entails windows of dynamic length. The way flink approaches is pretty simple. It will create a TimeWindow of size "gap" relative to the event time, find an overlapping window ( intersection ) and create a covering window. Each such window has a "state" associated with it, which too has to be merged when a cover window is created on intersection of 2 or more incident windows.To be more precise if Window1 spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2 a new Window is created ( t1, t4 ) and the associated states are merged. 


In the current Window API the states are external and are Accumulator based. This approach pretty much works for all cases where the aggregation is accumulative/reduced  and does not depend on order, as in no order list of incoming records needs to be kept and reduction is to a single aggregated element ( think counts, min max etc). In path analysis ( and other use cases ) however this approach has drawbacks. Even though in our accumulator we could keep an ordered list of events it becomes unreasonable if not within bounds. An approach that does attempt to bind state, is to preemptively analyze paths using the WM as the marker that defines the subset of the state that is safe to analyze. So if we have n events in the window state and m fall before WM, we can safely analyze the m subset, emitting paths seen and reducing the cumulative state size. There are caveats though that I will go into later. 


Unfortunately the Accumulators in Flink Window runtime defaults do not have access to the WM. 


This lead to this generic approach  ( implemented and tested ) 


* Use a low level ProcessFunction that allows access to WM and definitely nearer to the guts of Flink.


* Still use the merge Windows on intersection approach but use WM to trigger ( through Timers)  reductions in state. This is not very dissimilar to what Flink does but we have more control over what to do and when to do it. Essentially have exposed a lifecycle method that reacts to WM progression.  


* There are essentially 2 Timers. The first timer is the maxTimeStamp() of a Window, which if there is no further mutation b'coz of merge etc will fire to reflect a Session End. The second one is  on currentWaterMark+1 that essentially calls a "reduceToWM" on each keyed Window and thus State.


* There are 2 ways to short circuit a Session 1. On Session time span 2. On Session size. 


* There is a safety valve to blacklist keys when it is obvious that it is a bot ( again 


The solution will thus preemptively push out Patterns ( and correct patterns ) while keeping the ordered state within reasonable bounds. The incident data of course has to be analyzed . Are the paths to large etc. But one has full control over how to fashion the solution.




Regards and Thanks,


Vishal









On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <[hidden email]> wrote:
This makes sense.  Thanks.

On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

all calls to onElement() or onTimer() are syncronized for any keys. Think of a single thread calling these methods.
Event-time timers are called when a watermark passes the timer. Watermarks are received as special records, so the methods are called in the same order as records (actual records or watermarks) arrive at the function. Only for processing-time timers, actual synchronization is required.

The NPE might be thrown because of two timers that fire one after the other without a new record being processed in between the onTimer() calls. In that case the state is cleared in the first call and null in the second.

Best, Fabian

2017-12-23 16:36 GMT+01:00 Vishal Santoshi <[hidden email]>:

Thanks. 




    I have a few follow up questions regarding ProcessFunction. I think that the core should take care of any synchronization issues between calls to onElement and onTimer in case of a keyed stream but tests do not seem to suggest that.



I have  specifically 2 questions.


1.  Are calls  to onElement(..) single threaded if scoped to a key ? As in on a keyed stream, is there a  way that 2 or more threads can execute on the more than one element of a single key at one time ? Would I have to synchronize this construction     

OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}



2. Can concurrent calls happen  onTimer(..) and onElement(..) for the same key ? I intend to clean up state but I see  NullPointers in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are executed on 2  separate threads, with on Timer removing the state ( clear() ) but after another thread has registered a Timer ( in onElement ).


if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}






PS. This is the full code.



@Override
public void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}
accumulator.setLastModified(context.timestamp());
accumulatorState.update(accumulator);
timerService.registerEventTimeTimer(context.timestamp() + gap);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
OUT accumulator = accumulatorState.value();
if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}
}



On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <[hidden email]> wrote:
That's correct. Removal of timers is not supported in ProcessFunction. Not sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers and have a ValueState that stores the valid timestamp on which the onTimer method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register multiple timers on (currentWatermark + 1). Since there is only one timer per timestamp, there is only one timer which gets continuously overwritten. The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi <[hidden email]>:
And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <[hidden email]> wrote:
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ?  Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.
















Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Fabian Hueske-2
Hi,

you would not need the ListStateDescriptor. A WindowProcessFunction stores all events that are assigned to a window (IN objects in your case) in an internal ListState.
The Iterable<IN> parameter of the process() method iterates over the internal list state.

So you would have a Trigger that fires when a new watermark is received (or in regular intervals like every minute) and at the end of the window.
The process() method looks up the current watermark in the Context object, traverses the Iterable<IN> filtering out all events with timestamp > watermark (you would need to enrich the events with the timestamp which can be done in a ProcessFunction), inserting the remaining ones into a sorted data structure (possibly leveraging the almost sorted nature of the events) and create a Session from it.

This is probably less efficient than your ProcessFunction because process() would go over the complete list over and over again and not be able to persist the result of previous invocations.
However, the code should be easier to maintain.

Does that make sense?

Best, Fabian

2018-01-05 17:28 GMT+01:00 Vishal Santoshi <[hidden email]>:
Hello Fabian, Thank you for your response. 

                     I thought about it and may be am missing something obvious here. The code below is what I think you suggest. The issue is that the window now is a list of Session's ( or shall subsets of the Session). 

What is required is that on a new watermark

* We sort these Session objects
* Get the subset that are before the new Watermark and an emit without purge.

I do not see how the Trigger approach helps us. It does tell us that the watermark has progressed but to get a subset of the ListState that falls before the watermark, we would need access to the new value  of the watermark. That was what my initial query was. 



public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, TimeWindow> {


OUT toCreateNew;
Long gap;
private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;

public SessionProcessWindow(TypeInformation<OUT> aggregationResultType,
OUT toCreateNew) {
this.toCreateNew = toCreateNew;
mergingSetsStateDescriptor =
new ListStateDescriptor<>("sessions", aggregationResultType);
}
@Override
public void process(String s, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception {
OUT session = toCreateNew.createNew();
elements.forEach(f -> session.add(f));
context.windowState().getListState(mergingSetsStateDescriptor).add(session);
}
}

On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

thanks for sharing your solution!

Looking at this issue again and your mail in which you shared your SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the ValueState that prevents the ProcessWindowFunction to be used in a mergeable window.
You could have created a new Session object in each invocation of the ProcessWindowFucntion and simply keep the elements in the (mergable) list state of the window.
In that case you would simply need a custom trigger that calls the ProcessWindowFunction when a new watermark arrives. For intermediate calls, you just FIRE and for the final call you FIRE_AND_PURGE to remove the elements from the window's state.
Did you try that?

Best, Fabian



2018-01-03 15:57 GMT+01:00 Vishal Santoshi <[hidden email]>:
Dear Fabian,

           I was able to create a pretty functional ProcessFunction and here is the synopsis and please see if it makes sense.

Sessionization is unique as in it entails windows of dynamic length. The way flink approaches is pretty simple. It will create a TimeWindow of size "gap" relative to the event time, find an overlapping window ( intersection ) and create a covering window. Each such window has a "state" associated with it, which too has to be merged when a cover window is created on intersection of 2 or more incident windows.To be more precise if Window1 spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2 a new Window is created ( t1, t4 ) and the associated states are merged. 


In the current Window API the states are external and are Accumulator based. This approach pretty much works for all cases where the aggregation is accumulative/reduced  and does not depend on order, as in no order list of incoming records needs to be kept and reduction is to a single aggregated element ( think counts, min max etc). In path analysis ( and other use cases ) however this approach has drawbacks. Even though in our accumulator we could keep an ordered list of events it becomes unreasonable if not within bounds. An approach that does attempt to bind state, is to preemptively analyze paths using the WM as the marker that defines the subset of the state that is safe to analyze. So if we have n events in the window state and m fall before WM, we can safely analyze the m subset, emitting paths seen and reducing the cumulative state size. There are caveats though that I will go into later. 


Unfortunately the Accumulators in Flink Window runtime defaults do not have access to the WM. 


This lead to this generic approach  ( implemented and tested ) 


* Use a low level ProcessFunction that allows access to WM and definitely nearer to the guts of Flink.


* Still use the merge Windows on intersection approach but use WM to trigger ( through Timers)  reductions in state. This is not very dissimilar to what Flink does but we have more control over what to do and when to do it. Essentially have exposed a lifecycle method that reacts to WM progression.  


* There are essentially 2 Timers. The first timer is the maxTimeStamp() of a Window, which if there is no further mutation b'coz of merge etc will fire to reflect a Session End. The second one is  on currentWaterMark+1 that essentially calls a "reduceToWM" on each keyed Window and thus State.


* There are 2 ways to short circuit a Session 1. On Session time span 2. On Session size. 


* There is a safety valve to blacklist keys when it is obvious that it is a bot ( again 


The solution will thus preemptively push out Patterns ( and correct patterns ) while keeping the ordered state within reasonable bounds. The incident data of course has to be analyzed . Are the paths to large etc. But one has full control over how to fashion the solution.




Regards and Thanks,


Vishal









On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <[hidden email]> wrote:
This makes sense.  Thanks.

On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

all calls to onElement() or onTimer() are syncronized for any keys. Think of a single thread calling these methods.
Event-time timers are called when a watermark passes the timer. Watermarks are received as special records, so the methods are called in the same order as records (actual records or watermarks) arrive at the function. Only for processing-time timers, actual synchronization is required.

The NPE might be thrown because of two timers that fire one after the other without a new record being processed in between the onTimer() calls. In that case the state is cleared in the first call and null in the second.

Best, Fabian

2017-12-23 16:36 GMT+01:00 Vishal Santoshi <[hidden email]>:

Thanks. 




    I have a few follow up questions regarding ProcessFunction. I think that the core should take care of any synchronization issues between calls to onElement and onTimer in case of a keyed stream but tests do not seem to suggest that.



I have  specifically 2 questions.


1.  Are calls  to onElement(..) single threaded if scoped to a key ? As in on a keyed stream, is there a  way that 2 or more threads can execute on the more than one element of a single key at one time ? Would I have to synchronize this construction     

OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}



2. Can concurrent calls happen  onTimer(..) and onElement(..) for the same key ? I intend to clean up state but I see  NullPointers in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are executed on 2  separate threads, with on Timer removing the state ( clear() ) but after another thread has registered a Timer ( in onElement ).


if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}






PS. This is the full code.



@Override
public void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}
accumulator.setLastModified(context.timestamp());
accumulatorState.update(accumulator);
timerService.registerEventTimeTimer(context.timestamp() + gap);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
OUT accumulator = accumulatorState.value();
if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}
}



On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <[hidden email]> wrote:
That's correct. Removal of timers is not supported in ProcessFunction. Not sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers and have a ValueState that stores the valid timestamp on which the onTimer method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register multiple timers on (currentWatermark + 1). Since there is only one timer per timestamp, there is only one timer which gets continuously overwritten. The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi <[hidden email]>:
And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <[hidden email]> wrote:
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ?  Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.

















Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Vishal Santoshi
Yep, this though is suboptimal as you imagined.   Two things

* <IN> has a internally has a <INLite> that is a ultra lite version of IN, only required for the path analysis. 
* Sessionization being expensive, we piggy back multiple other aggregations that do not depend on the path or order ( count etc ) . Essentially Session is (order path + accumulated stats).

The code seems pretty all right and please tell me if you need a see it. All generics so no secrets here.








On Fri, Jan 5, 2018 at 11:58 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

you would not need the ListStateDescriptor. A WindowProcessFunction stores all events that are assigned to a window (IN objects in your case) in an internal ListState.
The Iterable<IN> parameter of the process() method iterates over the internal list state.

So you would have a Trigger that fires when a new watermark is received (or in regular intervals like every minute) and at the end of the window.
The process() method looks up the current watermark in the Context object, traverses the Iterable<IN> filtering out all events with timestamp > watermark (you would need to enrich the events with the timestamp which can be done in a ProcessFunction), inserting the remaining ones into a sorted data structure (possibly leveraging the almost sorted nature of the events) and create a Session from it.

This is probably less efficient than your ProcessFunction because process() would go over the complete list over and over again and not be able to persist the result of previous invocations.
However, the code should be easier to maintain.

Does that make sense?

Best, Fabian

2018-01-05 17:28 GMT+01:00 Vishal Santoshi <[hidden email]>:
Hello Fabian, Thank you for your response. 

                     I thought about it and may be am missing something obvious here. The code below is what I think you suggest. The issue is that the window now is a list of Session's ( or shall subsets of the Session). 

What is required is that on a new watermark

* We sort these Session objects
* Get the subset that are before the new Watermark and an emit without purge.

I do not see how the Trigger approach helps us. It does tell us that the watermark has progressed but to get a subset of the ListState that falls before the watermark, we would need access to the new value  of the watermark. That was what my initial query was. 



public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, TimeWindow> {


OUT toCreateNew;
Long gap;
private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;

public SessionProcessWindow(TypeInformation<OUT> aggregationResultType,
OUT toCreateNew) {
this.toCreateNew = toCreateNew;
mergingSetsStateDescriptor =
new ListStateDescriptor<>("sessions", aggregationResultType);
}
@Override
public void process(String s, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception {
OUT session = toCreateNew.createNew();
elements.forEach(f -> session.add(f));
context.windowState().getListState(mergingSetsStateDescriptor).add(session);
}
}

On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

thanks for sharing your solution!

Looking at this issue again and your mail in which you shared your SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the ValueState that prevents the ProcessWindowFunction to be used in a mergeable window.
You could have created a new Session object in each invocation of the ProcessWindowFucntion and simply keep the elements in the (mergable) list state of the window.
In that case you would simply need a custom trigger that calls the ProcessWindowFunction when a new watermark arrives. For intermediate calls, you just FIRE and for the final call you FIRE_AND_PURGE to remove the elements from the window's state.
Did you try that?

Best, Fabian



2018-01-03 15:57 GMT+01:00 Vishal Santoshi <[hidden email]>:
Dear Fabian,

           I was able to create a pretty functional ProcessFunction and here is the synopsis and please see if it makes sense.

Sessionization is unique as in it entails windows of dynamic length. The way flink approaches is pretty simple. It will create a TimeWindow of size "gap" relative to the event time, find an overlapping window ( intersection ) and create a covering window. Each such window has a "state" associated with it, which too has to be merged when a cover window is created on intersection of 2 or more incident windows.To be more precise if Window1 spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2 a new Window is created ( t1, t4 ) and the associated states are merged. 


In the current Window API the states are external and are Accumulator based. This approach pretty much works for all cases where the aggregation is accumulative/reduced  and does not depend on order, as in no order list of incoming records needs to be kept and reduction is to a single aggregated element ( think counts, min max etc). In path analysis ( and other use cases ) however this approach has drawbacks. Even though in our accumulator we could keep an ordered list of events it becomes unreasonable if not within bounds. An approach that does attempt to bind state, is to preemptively analyze paths using the WM as the marker that defines the subset of the state that is safe to analyze. So if we have n events in the window state and m fall before WM, we can safely analyze the m subset, emitting paths seen and reducing the cumulative state size. There are caveats though that I will go into later. 


Unfortunately the Accumulators in Flink Window runtime defaults do not have access to the WM. 


This lead to this generic approach  ( implemented and tested ) 


* Use a low level ProcessFunction that allows access to WM and definitely nearer to the guts of Flink.


* Still use the merge Windows on intersection approach but use WM to trigger ( through Timers)  reductions in state. This is not very dissimilar to what Flink does but we have more control over what to do and when to do it. Essentially have exposed a lifecycle method that reacts to WM progression.  


* There are essentially 2 Timers. The first timer is the maxTimeStamp() of a Window, which if there is no further mutation b'coz of merge etc will fire to reflect a Session End. The second one is  on currentWaterMark+1 that essentially calls a "reduceToWM" on each keyed Window and thus State.


* There are 2 ways to short circuit a Session 1. On Session time span 2. On Session size. 


* There is a safety valve to blacklist keys when it is obvious that it is a bot ( again 


The solution will thus preemptively push out Patterns ( and correct patterns ) while keeping the ordered state within reasonable bounds. The incident data of course has to be analyzed . Are the paths to large etc. But one has full control over how to fashion the solution.




Regards and Thanks,


Vishal









On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <[hidden email]> wrote:
This makes sense.  Thanks.

On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

all calls to onElement() or onTimer() are syncronized for any keys. Think of a single thread calling these methods.
Event-time timers are called when a watermark passes the timer. Watermarks are received as special records, so the methods are called in the same order as records (actual records or watermarks) arrive at the function. Only for processing-time timers, actual synchronization is required.

The NPE might be thrown because of two timers that fire one after the other without a new record being processed in between the onTimer() calls. In that case the state is cleared in the first call and null in the second.

Best, Fabian

2017-12-23 16:36 GMT+01:00 Vishal Santoshi <[hidden email]>:

Thanks. 




    I have a few follow up questions regarding ProcessFunction. I think that the core should take care of any synchronization issues between calls to onElement and onTimer in case of a keyed stream but tests do not seem to suggest that.



I have  specifically 2 questions.


1.  Are calls  to onElement(..) single threaded if scoped to a key ? As in on a keyed stream, is there a  way that 2 or more threads can execute on the more than one element of a single key at one time ? Would I have to synchronize this construction     

OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}



2. Can concurrent calls happen  onTimer(..) and onElement(..) for the same key ? I intend to clean up state but I see  NullPointers in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are executed on 2  separate threads, with on Timer removing the state ( clear() ) but after another thread has registered a Timer ( in onElement ).


if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}






PS. This is the full code.



@Override
public void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}
accumulator.setLastModified(context.timestamp());
accumulatorState.update(accumulator);
timerService.registerEventTimeTimer(context.timestamp() + gap);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
OUT accumulator = accumulatorState.value();
if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}
}



On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <[hidden email]> wrote:
That's correct. Removal of timers is not supported in ProcessFunction. Not sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers and have a ValueState that stores the valid timestamp on which the onTimer method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register multiple timers on (currentWatermark + 1). Since there is only one timer per timestamp, there is only one timer which gets continuously overwritten. The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi <[hidden email]>:
And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <[hidden email]> wrote:
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ?  Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.


















Reply | Threaded
Open this post in threaded view
|

Re: A question about Triggers

Fabian Hueske-2
I think I got it
Glad you solved this tricky issue and thanks for sharing your solution :-)

Best, Fabian


2018-01-06 14:33 GMT+01:00 Vishal Santoshi <[hidden email]>:
Yep, this though is suboptimal as you imagined.   Two things

* <IN> has a internally has a <INLite> that is a ultra lite version of IN, only required for the path analysis. 
* Sessionization being expensive, we piggy back multiple other aggregations that do not depend on the path or order ( count etc ) . Essentially Session is (order path + accumulated stats).

The code seems pretty all right and please tell me if you need a see it. All generics so no secrets here.








On Fri, Jan 5, 2018 at 11:58 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

you would not need the ListStateDescriptor. A WindowProcessFunction stores all events that are assigned to a window (IN objects in your case) in an internal ListState.
The Iterable<IN> parameter of the process() method iterates over the internal list state.

So you would have a Trigger that fires when a new watermark is received (or in regular intervals like every minute) and at the end of the window.
The process() method looks up the current watermark in the Context object, traverses the Iterable<IN> filtering out all events with timestamp > watermark (you would need to enrich the events with the timestamp which can be done in a ProcessFunction), inserting the remaining ones into a sorted data structure (possibly leveraging the almost sorted nature of the events) and create a Session from it.

This is probably less efficient than your ProcessFunction because process() would go over the complete list over and over again and not be able to persist the result of previous invocations.
However, the code should be easier to maintain.

Does that make sense?

Best, Fabian

2018-01-05 17:28 GMT+01:00 Vishal Santoshi <[hidden email]>:
Hello Fabian, Thank you for your response. 

                     I thought about it and may be am missing something obvious here. The code below is what I think you suggest. The issue is that the window now is a list of Session's ( or shall subsets of the Session). 

What is required is that on a new watermark

* We sort these Session objects
* Get the subset that are before the new Watermark and an emit without purge.

I do not see how the Trigger approach helps us. It does tell us that the watermark has progressed but to get a subset of the ListState that falls before the watermark, we would need access to the new value  of the watermark. That was what my initial query was. 



public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, TimeWindow> {


OUT toCreateNew;
Long gap;
private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;

public SessionProcessWindow(TypeInformation<OUT> aggregationResultType,
OUT toCreateNew) {
this.toCreateNew = toCreateNew;
mergingSetsStateDescriptor =
new ListStateDescriptor<>("sessions", aggregationResultType);
}
@Override
public void process(String s, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception {
OUT session = toCreateNew.createNew();
elements.forEach(f -> session.add(f));
context.windowState().getListState(mergingSetsStateDescriptor).add(session);
}
}

On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

thanks for sharing your solution!

Looking at this issue again and your mail in which you shared your SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the ValueState that prevents the ProcessWindowFunction to be used in a mergeable window.
You could have created a new Session object in each invocation of the ProcessWindowFucntion and simply keep the elements in the (mergable) list state of the window.
In that case you would simply need a custom trigger that calls the ProcessWindowFunction when a new watermark arrives. For intermediate calls, you just FIRE and for the final call you FIRE_AND_PURGE to remove the elements from the window's state.
Did you try that?

Best, Fabian



2018-01-03 15:57 GMT+01:00 Vishal Santoshi <[hidden email]>:
Dear Fabian,

           I was able to create a pretty functional ProcessFunction and here is the synopsis and please see if it makes sense.

Sessionization is unique as in it entails windows of dynamic length. The way flink approaches is pretty simple. It will create a TimeWindow of size "gap" relative to the event time, find an overlapping window ( intersection ) and create a covering window. Each such window has a "state" associated with it, which too has to be merged when a cover window is created on intersection of 2 or more incident windows.To be more precise if Window1 spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2 a new Window is created ( t1, t4 ) and the associated states are merged. 


In the current Window API the states are external and are Accumulator based. This approach pretty much works for all cases where the aggregation is accumulative/reduced  and does not depend on order, as in no order list of incoming records needs to be kept and reduction is to a single aggregated element ( think counts, min max etc). In path analysis ( and other use cases ) however this approach has drawbacks. Even though in our accumulator we could keep an ordered list of events it becomes unreasonable if not within bounds. An approach that does attempt to bind state, is to preemptively analyze paths using the WM as the marker that defines the subset of the state that is safe to analyze. So if we have n events in the window state and m fall before WM, we can safely analyze the m subset, emitting paths seen and reducing the cumulative state size. There are caveats though that I will go into later. 


Unfortunately the Accumulators in Flink Window runtime defaults do not have access to the WM. 


This lead to this generic approach  ( implemented and tested ) 


* Use a low level ProcessFunction that allows access to WM and definitely nearer to the guts of Flink.


* Still use the merge Windows on intersection approach but use WM to trigger ( through Timers)  reductions in state. This is not very dissimilar to what Flink does but we have more control over what to do and when to do it. Essentially have exposed a lifecycle method that reacts to WM progression.  


* There are essentially 2 Timers. The first timer is the maxTimeStamp() of a Window, which if there is no further mutation b'coz of merge etc will fire to reflect a Session End. The second one is  on currentWaterMark+1 that essentially calls a "reduceToWM" on each keyed Window and thus State.


* There are 2 ways to short circuit a Session 1. On Session time span 2. On Session size. 


* There is a safety valve to blacklist keys when it is obvious that it is a bot ( again 


The solution will thus preemptively push out Patterns ( and correct patterns ) while keeping the ordered state within reasonable bounds. The incident data of course has to be analyzed . Are the paths to large etc. But one has full control over how to fashion the solution.




Regards and Thanks,


Vishal









On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <[hidden email]> wrote:
This makes sense.  Thanks.

On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

all calls to onElement() or onTimer() are syncronized for any keys. Think of a single thread calling these methods.
Event-time timers are called when a watermark passes the timer. Watermarks are received as special records, so the methods are called in the same order as records (actual records or watermarks) arrive at the function. Only for processing-time timers, actual synchronization is required.

The NPE might be thrown because of two timers that fire one after the other without a new record being processed in between the onTimer() calls. In that case the state is cleared in the first call and null in the second.

Best, Fabian

2017-12-23 16:36 GMT+01:00 Vishal Santoshi <[hidden email]>:

Thanks. 




    I have a few follow up questions regarding ProcessFunction. I think that the core should take care of any synchronization issues between calls to onElement and onTimer in case of a keyed stream but tests do not seem to suggest that.



I have  specifically 2 questions.


1.  Are calls  to onElement(..) single threaded if scoped to a key ? As in on a keyed stream, is there a  way that 2 or more threads can execute on the more than one element of a single key at one time ? Would I have to synchronize this construction     

OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}



2. Can concurrent calls happen  onTimer(..) and onElement(..) for the same key ? I intend to clean up state but I see  NullPointers in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are executed on 2  separate threads, with on Timer removing the state ( clear() ) but after another thread has registered a Timer ( in onElement ).


if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}






PS. This is the full code.



@Override
public void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}
accumulator.setLastModified(context.timestamp());
accumulatorState.update(accumulator);
timerService.registerEventTimeTimer(context.timestamp() + gap);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
OUT accumulator = accumulatorState.value();
if (timestamp == accumulator.getLastModified() + gap) { // NullPointers on Race Conditions
accumulatorState.clear();
}
}



On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <[hidden email]> wrote:
That's correct. Removal of timers is not supported in ProcessFunction. Not sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers and have a ValueState that stores the valid timestamp on which the onTimer method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register multiple timers on (currentWatermark + 1). Since there is only one timer per timestamp, there is only one timer which gets continuously overwritten. The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi <[hidden email]>:
And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <[hidden email]> wrote:
Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ?  Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object, and even if they do it is not guaranteed that a mutation of the object in onElement() has an effect. The object might have been serialized and stored in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <[hidden email]>:
I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. 


I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <[hidden email]> wrote:
The Trigger in this case would be some CountBased Trigger.... Again the motive is the keep the state lean as we desire to search for  patterns, sorted on even time,  in the incoming sessionized ( and thus of un deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <[hidden email]> wrote:
For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the  trim up to watermark behavior ( reduce call after telling it the current WM )  we desire 

public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {

private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {

ValueState<Session> session = context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState<Session> session = context.windowState().getState(sessionState);
session.clear();
}
}



On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <[hidden email]> wrote:
Hello Fabian, Thank you for the response.
 
 I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows. 

   The best API  option I think is a TimeBaseTrigger that fires every configured time progression of WM  and a Window implementation that materializes only data up till that WM ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow  us handle the "only data up till that WM" range retrieval using some  custom data structure. 

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

the Trigger is not designed to augment records but just to control when a window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <[hidden email]>:
An addendum 

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>, the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is not visible to the Accumulator that is carrying it as a previous element  reference albeit in the next invocation of add(). This seems to be only in distributed mode, which makes sense only if theses reference point to different objects.

The pipeline
.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
                /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
                 
accumulator.add(value);

}
.....

The Trigger

public class CountBasedWMAugmentationTrigger<T extends
Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {


@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        /** The element T is mutated to carry the watermark **/
element.setWaterMark(ctx.getCurrentWatermark());

.




On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <[hidden email]> wrote:
I want to augment a POJO in  Trigger's onElement method, specifically supply the POJO with the watermark from the TriggerContext. The sequence of execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO  reference in the Accumulator.
2. call to onElement on Tigger 
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have access to the mutation by the onElement() in the POJO in the subsequent add(),  but not on a distributed cluster. The specific question I had is whether  add() on a supplied accumulator on a window and onElement() method of the trigger on that window are inline executions, on the same thread or is there any serialization/deserialization IPC that causes these divergence ( local versus distributed ) 

Regards.