global window trigger

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

global window trigger

jad mad
for a global window with 
a custom event time trigger that fires every 1 minute
and then apply a custom window function to it,

the trigger firing seems working but the element collection
i get inside of my custom WindowFunction is always 
the whole inputs from start to end rather than 
inputs subset from start to the every 1min window end(maxTimestamp).

is this because GlobalWindows is a processing time operator that 
does not work with event time?

thanks a lot,


Reply | Threaded
Open this post in threaded view
|

Re: global window trigger

prashantnayak
Hi

We've have custom operators using global windows and are using event time.

How are you specifying event time as the time characteristic?

Prashant
Reply | Threaded
Open this post in threaded view
|

Re: global window trigger

jad mad
Hi Prashant,

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

actually I could make my custom trigger to fire periodically.
The problem is the element set stored in the iterable variable 
is always uniform which is not what I'm expecting...

private static class MyWindowFunction_Window...
         ...    
       @Override
        public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
             ...
             for(MyClass element : iterable)

does anyone have any idea on this?
thanks a lot in advance,
jad


On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <[hidden email]> wrote:
Hi

We've have custom operators using global windows and are using event time.

How are you specifying event time as the time characteristic?

Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: global window trigger

Aljoscha Krettek
Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.

You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.

.window(PurgingTrigger.of(<my trigger>))

Best,
Aljoscha

On 13. Jul 2017, at 14:00, jad mad <[hidden email]> wrote:

Hi Prashant,

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

actually I could make my custom trigger to fire periodically.
The problem is the element set stored in the iterable variable 
is always uniform which is not what I'm expecting...

private static class MyWindowFunction_Window...
         ...    
       @Override
        public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
             ...
             for(MyClass element : iterable)

does anyone have any idea on this?
thanks a lot in advance,
jad


On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <[hidden email]> wrote:
Hi

We've have custom operators using global windows and are using event time.

How are you specifying event time as the time characteristic?

Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: global window trigger

jad mad
Hi Aljoscha

thanks for the comment. 
is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;" 
inside of a custom trigger?

gave it a test and the result seems the opposite of what I meant...
instead of throwing away previous windows' contents, I wanna keep them
all the way till the end. 
that way I can get the cumulative counts of all input.

wonder how to achieve it.
anyone?

jad


On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <[hidden email]> wrote:
Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.

You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.

.window(PurgingTrigger.of(<my trigger>))

Best,
Aljoscha

On 13. Jul 2017, at 14:00, jad mad <[hidden email]> wrote:

Hi Prashant,

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

actually I could make my custom trigger to fire periodically.
The problem is the element set stored in the iterable variable 
is always uniform which is not what I'm expecting...

private static class MyWindowFunction_Window...
         ...    
       @Override
        public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
             ...
             for(MyClass element : iterable)

does anyone have any idea on this?
thanks a lot in advance,
jad


On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <[hidden email]> wrote:
Hi

We've have custom operators using global windows and are using event time.

How are you specifying event time as the time characteristic?

Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: global window trigger

Aljoscha Krettek
Hi,

Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that data is never cleared away when using GlobalWindows. Is that not the case?

Best,
Aljoscha
On 14. Jul 2017, at 16:29, jad mad <[hidden email]> wrote:

Hi Aljoscha

thanks for the comment. 
is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;" 
inside of a custom trigger?

gave it a test and the result seems the opposite of what I meant...
instead of throwing away previous windows' contents, I wanna keep them
all the way till the end. 
that way I can get the cumulative counts of all input.

wonder how to achieve it.
anyone?

jad


On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <[hidden email]> wrote:
Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.

You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.

.window(PurgingTrigger.of(<my trigger>))

Best,
Aljoscha

On 13. Jul 2017, at 14:00, jad mad <[hidden email]> wrote:

Hi Prashant,

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

actually I could make my custom trigger to fire periodically.
The problem is the element set stored in the iterable variable 
is always uniform which is not what I'm expecting...

private static class MyWindowFunction_Window...
         ...    
       @Override
        public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
             ...
             for(MyClass element : iterable)

does anyone have any idea on this?
thanks a lot in advance,
jad


On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <[hidden email]> wrote:
Hi

We've have custom operators using global windows and are using event time.

How are you specifying event time as the time characteristic?

Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




Reply | Threaded
Open this post in threaded view
|

Re: global window trigger

Aljoscha Krettek
Hi,

I’m afraid this will not work well because a WindowAssigner should be stateless, i.e. it should not keep any state in fields. The reason is that there can be several WindowAssigners used on the different partitions and the order in which a WindowAssigner sees the incoming elements is also not guaranteed. That is, you might set a timestamp in the “first_timestamp” field that is not chronologically the “first timestamp”.

The reason for your windows being purged is probably the allowed lateness, which is zero by default. When the watermark passes the end of a window plus the allowed lateness the window contents are being purged. You can configure the allowed lateness via WindowedStream.allowedLateness(). You should be careful, though, because of you set this too high you might never clean up your window state and therefore have ever growing state.

Best,
Aljoscha

On 18. Jul 2017, at 15:05, jad mad <[hidden email]> wrote:

Aljoscha,

what a great answer and this is what I'd expected!

as a workaround I've modified the EventTimeSlidingWindow a little bit to a custom WindowAssigner like below : 
the a few differences are 
1.storing the first timestamp in a variable "first_timestamp", 
2.used this time stamp as the any following windows' start time.
@PublicEvolving
public class MySlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final long slide;
private final long offset;
private long first_timestamp = -1L; // added by me!

protected MySlidingEventTimeWindows(long size, long slide, long offset) {
if(offset >= 0L && offset < slide && size > 0L) {
this.size = size;
this.slide = slide;
this.offset = offset;
} else {
throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
}
}

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if(timestamp <= -9223372036854775808L) {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
} else {
if(this.first_timestamp == -1L) {// added by me!
this.first_timestamp = timestamp;
System.out.println("===================== " + this.first_timestamp + " ========================");
}
List<TimeWindow> windows = new ArrayList((int)(this.size / this.slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.slide);

for(long start = lastStart; start > timestamp - this.size; start -= this.slide) {
//windows.add(new TimeWindow(start, start + this.size)); // original implementation
windows.add(new TimeWindow(this.first_timestamp, start + this.size)); // modified by me!
}
return windows;
}
}
the result I get from MyWindowFunction(...) is like below : 
2017-01-01 00:17:39 2017-01-01 00:00:01 2
2017-01-01 00:17:39 2017-01-01 00:00:02 4
2017-01-01 00:17:39 2017-01-01 00:00:03 4
2017-01-01 00:17:39 2017-01-01 00:00:04 10
2017-01-01 00:17:39 2017-01-01 00:00:05 19
2017-01-01 00:17:39 2017-01-01 00:00:06 19
2017-01-01 00:17:39 2017-01-01 00:00:07 20
2017-01-01 00:17:39 2017-01-01 00:00:08 23
2017-01-01 00:17:39 2017-01-01 00:00:09 21
2017-01-01 00:17:39 2017-01-01 00:00:10 7
2017-01-01 00:17:39 2017-01-01 00:00:11 2
2017-01-01 00:17:39 2017-01-01 00:00:12 5
2017-01-01 00:17:39 2017-01-01 00:00:13 12
2017-01-01 00:17:39 2017-01-01 00:00:14 17
2017-01-01 00:17:39 2017-01-01 00:00:15 9
2017-01-01 00:17:39 2017-01-01 00:00:16 8

things I don't seem to understand are 
1. when my inputs' first line time stamp is 2017-01-01 00:00:00 why is 2017-01-01 00:17:39 shown up in my result as 
     each sliding window's start time?
    basically, I'm just printing out the time stamp came with the first iterable object's element in MyWindowFunction.
2. I made MyWindowAssigner in a hope that the starting time is fixed and the contents not being purged.
    however, from the results, we can see it works just as a normal EventTimeSlidingWindow with contents
    been purged.
    How can I make it not to throw away its window contents even after each time firing.
3. this MyWindowAssigner(...) attempt arose as an effort based on your previous advice using a 
    different WindowFunction. wonder if I'm heading to the right direction or not.
    
thank you very much!
jad

On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <[hidden email]> wrote:
Ah, I see. The problem is that the watermark has slightly tricky semantics: A watermark T says that there will not be elements with a timestamp <= T in the future. It does not say, that there have not yet been elements with a timestamp > T. In your specific case, this means that there will be elements in the GlobalWindow that have a timestamp that is after the firing timestamp of your trigger. If you want to make sure that windows are somehow put into buckets, based on their timestamp then you need to use a different WindowFunction, because GlobalWindows simply puts every element into the same bucket (window).

Regarding the firing timestamp, it’s currently not possible the get that from within a WindowFunction.

Best,
Aljoscha


On 16. Jul 2017, at 12:16, jad mad <[hidden email]> wrote:

Hello Aljoscha,

thank you very much for your reply. the issue with me is two-fold.
first of all, 
the thing I wanted to achieve was having a GlobalWindows and let it fire 
periodically, say 1 hour or 1 day, and then do some custom calculation.
this custom trigger part I've implemented seems working well.

currently, when every time my custom trigger fires periodically, the elements of iterable object
passed onto my custom WindowFunction contains whole inputs from the start to the end rather than
from start to the timing(event time timestamp) where each time trigger fires.
have been worked on this for a week now but not being able to find any solution yet.

input example. 
2017-07-16 00:00:01, x
2017-07-16 00:00:12, x
2017-07-16 01:03:06, x
2017-07-16 02:20:10, x

In this case, a GlobalWindows with 1-hour periodical trigger, designed to count the cumulative record in MyWindowFunction should emit something like
2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2
2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3
2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4
↑ the start time stamp doesn't change!

now, what I get is like
2017-07-16 00:00:00 ~ , 4
2017-07-16 00:00:00 ~ , 4
2017-07-16 00:00:00 ~ , 4
↑every line the same results...

public class MyWindowFunction<T, W extends Window> implements WindowFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple, W> {

@Override
public void apply(Tuple tuple, W window, Iterable<Tuple2<String, String>> iterable, Collector<Tuple3<String, String, String>> out) throws Exception {

for(Tuple2<String, String> element : iterable)
{
...
}
out.collect(new Tuple3<String, String, String>("...", "...", "..."));
}
}
Secondly, for a GlobalWindows firing periodically, how do you get the periodical firing time stamp inside of
your MyWindowFunction? (the missing ~ part of ending time stamp in above example)

really appreciate the help!
jad


On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that data is never cleared away when using GlobalWindows. Is that not the case?

Best,
Aljoscha

On 14. Jul 2017, at 16:29, jad mad <[hidden email]> wrote:

Hi Aljoscha

thanks for the comment. 
is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;" 
inside of a custom trigger?

gave it a test and the result seems the opposite of what I meant...
instead of throwing away previous windows' contents, I wanna keep them
all the way till the end. 
that way I can get the cumulative counts of all input.

wonder how to achieve it.
anyone?

jad


On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <[hidden email]> wrote:
Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.

You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.

.window(PurgingTrigger.of(<my trigger>))

Best,
Aljoscha

On 13. Jul 2017, at 14:00, jad mad <[hidden email]> wrote:

Hi Prashant,

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

actually I could make my custom trigger to fire periodically.
The problem is the element set stored in the iterable variable 
is always uniform which is not what I'm expecting...

private static class MyWindowFunction_Window...
         ...    
       @Override
        public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
             ...
             for(MyClass element : iterable)

does anyone have any idea on this?
thanks a lot in advance,
jad


On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <[hidden email]> wrote:
Hi

We've have custom operators using global windows and are using event time.

How are you specifying event time as the time characteristic?

Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.








On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <[hidden email]> wrote:
Ah, I see. The problem is that the watermark has slightly tricky semantics: A watermark T says that there will not be elements with a timestamp <= T in the future. It does not say, that there have not yet been elements with a timestamp > T. In your specific case, this means that there will be elements in the GlobalWindow that have a timestamp that is after the firing timestamp of your trigger. If you want to make sure that windows are somehow put into buckets, based on their timestamp then you need to use a different WindowFunction, because GlobalWindows simply puts every element into the same bucket (window).

Regarding the firing timestamp, it’s currently not possible the get that from within a WindowFunction.

Best,
Aljoscha


On 16. Jul 2017, at 12:16, jad mad <[hidden email]> wrote:

Hello Aljoscha,

thank you very much for your reply. the issue with me is two-fold.
first of all, 
the thing I wanted to achieve was having a GlobalWindows and let it fire 
periodically, say 1 hour or 1 day, and then do some custom calculation.
this custom trigger part I've implemented seems working well.

currently, when every time my custom trigger fires periodically, the elements of iterable object
passed onto my custom WindowFunction contains whole inputs from the start to the end rather than
from start to the timing(event time timestamp) where each time trigger fires.
have been worked on this for a week now but not being able to find any solution yet.

input example. 
2017-07-16 00:00:01, x
2017-07-16 00:00:12, x
2017-07-16 01:03:06, x
2017-07-16 02:20:10, x

In this case, a GlobalWindows with 1-hour periodical trigger, designed to count the cumulative record in MyWindowFunction should emit something like
2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2
2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3
2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4
↑ the start time stamp doesn't change!

now, what I get is like
2017-07-16 00:00:00 ~ , 4
2017-07-16 00:00:00 ~ , 4
2017-07-16 00:00:00 ~ , 4
↑every line the same results...

public class MyWindowFunction<T, W extends Window> implements WindowFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple, W> {

@Override
public void apply(Tuple tuple, W window, Iterable<Tuple2<String, String>> iterable, Collector<Tuple3<String, String, String>> out) throws Exception {

for(Tuple2<String, String> element : iterable)
{
...
}
out.collect(new Tuple3<String, String, String>("...", "...", "..."));
}
}
Secondly, for a GlobalWindows firing periodically, how do you get the periodical firing time stamp inside of
your MyWindowFunction? (the missing ~ part of ending time stamp in above example)

really appreciate the help!
jad


On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that data is never cleared away when using GlobalWindows. Is that not the case?

Best,
Aljoscha

On 14. Jul 2017, at 16:29, jad mad <[hidden email]> wrote:

Hi Aljoscha

thanks for the comment. 
is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;" 
inside of a custom trigger?

gave it a test and the result seems the opposite of what I meant...
instead of throwing away previous windows' contents, I wanna keep them
all the way till the end. 
that way I can get the cumulative counts of all input.

wonder how to achieve it.
anyone?

jad


On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <[hidden email]> wrote:
Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.

You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.

.window(PurgingTrigger.of(<my trigger>))

Best,
Aljoscha

On 13. Jul 2017, at 14:00, jad mad <[hidden email]> wrote:

Hi Prashant,

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

actually I could make my custom trigger to fire periodically.
The problem is the element set stored in the iterable variable 
is always uniform which is not what I'm expecting...

private static class MyWindowFunction_Window...
         ...    
       @Override
        public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
             ...
             for(MyClass element : iterable)

does anyone have any idea on this?
thanks a lot in advance,
jad


On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <[hidden email]> wrote:
Hi

We've have custom operators using global windows and are using event time.

How are you specifying event time as the time characteristic?

Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.








Reply | Threaded
Open this post in threaded view
|

Re: global window trigger

Aljoscha Krettek
Hi,

Yes, you can have state in a WindowFunction if you use Flink’s state abstraction that you can access from a RichWindowFunction using the RuntimeContext. (Or by using a ProcessWindowFunction).

Trigger purging behaviour makes a difference if the Trigger fires repeatedly before the watermark reaches the end of the window. For example a trigger that speculatively fires early. In those cases it can make sense to make a distinction between purging and firing and just firing, depending on whether you want all accumulated window contents or only those elements that have accumulated since the last trigger firing.

GlobalWindows is not implemented by setting allowed lateness very high, it is a WindowAssigner that assigns Long.MAX_VALUE to the max window timestamp, the watermark will therefore never pass the end of that GlobalWindow.

Regarding your use case: since you want to keep all data since the start I would suggest to use GlobalWindows, a custom Trigger that periodically fires and a ProcessWindwoFunction. In the ProcessWindowFunction you can make sure to only process those elements that you want to process based on their timestamp and the current event time, which you can access from a ProcessWindowFunction.

If you don’t want to keep all events indefinitely (which could eventually blow up your state size) you can use an Evictor to sometimes evict certain events from the window buffers.

Best,
Aljoscha

On 20. Jul 2017, at 12:24, jad mad <[hidden email]> wrote:

Hello Aljoscha,

> I’m afraid this will not work well because a WindowAssigner should be stateless
ok, now understand this.
How about inside a custom WindowFunction(...), a bad idea to have states as well?

the default trigger for EventTimeTumblingWindow is the EventTimeTrigger(...).
looking at the definition file, there are a few return TriggerResult.FIRE; but not a single PURGE.
even so, each time the contents get cleared the time passes a window end.
Is this what you meant by 
>When the watermark passes the end of a window plus the allowed lateness the window contents are being purged. 
?
if yes, return TriggerResult.FIRE; or return TriggerResult.FIRE_AND_PURGE
seems less important for a trigger implementation because the contents will be cleared any way
and the lateness amount is more important?
And is this how a GlobalWindows implemented by setting the "lateness" to a huge number 
so that it keeps all things in it?

so, back to my original question.
in order to keep everything from start like a GlobalWindows, let it fire periodically and 
then perform some calcs, what combination of window assigner, trigger, and/or custom window function
I may use?  better if there'd be a simple working sample.

thank you a lot!
jad

On Thu, Jul 20, 2017 at 5:45 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

I’m afraid this will not work well because a WindowAssigner should be stateless, i.e. it should not keep any state in fields. The reason is that there can be several WindowAssigners used on the different partitions and the order in which a WindowAssigner sees the incoming elements is also not guaranteed. That is, you might set a timestamp in the “first_timestamp” field that is not chronologically the “first timestamp”.

The reason for your windows being purged is probably the allowed lateness, which is zero by default. When the watermark passes the end of a window plus the allowed lateness the window contents are being purged. You can configure the allowed lateness via WindowedStream.allowedLateness(). You should be careful, though, because of you set this too high you might never clean up your window state and therefore have ever growing state.

Best,
Aljoscha

On 18. Jul 2017, at 15:05, jad mad <[hidden email]> wrote:

Aljoscha,

what a great answer and this is what I'd expected!

as a workaround I've modified the EventTimeSlidingWindow a little bit to a custom WindowAssigner like below : 
the a few differences are 
1.storing the first timestamp in a variable "first_timestamp", 
2.used this time stamp as the any following windows' start time.
@PublicEvolving
public class MySlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final long slide;
private final long offset;
private long first_timestamp = -1L; // added by me!

protected MySlidingEventTimeWindows(long size, long slide, long offset) {
if(offset >= 0L && offset < slide && size > 0L) {
this.size = size;
this.slide = slide;
this.offset = offset;
} else {
throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
}
}

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if(timestamp <= -9223372036854775808L) {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
} else {
if(this.first_timestamp == -1L) {// added by me!
this.first_timestamp = timestamp;
System.out.println("===================== " + this.first_timestamp + " ========================");
}
List<TimeWindow> windows = new ArrayList((int)(this.size / this.slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.slide);

for(long start = lastStart; start > timestamp - this.size; start -= this.slide) {
//windows.add(new TimeWindow(start, start + this.size)); // original implementation
windows.add(new TimeWindow(this.first_timestamp, start + this.size)); // modified by me!
}
return windows;
}
}
the result I get from MyWindowFunction(...) is like below : 
2017-01-01 00:17:39 2017-01-01 00:00:01 2
2017-01-01 00:17:39 2017-01-01 00:00:02 4
2017-01-01 00:17:39 2017-01-01 00:00:03 4
2017-01-01 00:17:39 2017-01-01 00:00:04 10
2017-01-01 00:17:39 2017-01-01 00:00:05 19
2017-01-01 00:17:39 2017-01-01 00:00:06 19
2017-01-01 00:17:39 2017-01-01 00:00:07 20
2017-01-01 00:17:39 2017-01-01 00:00:08 23
2017-01-01 00:17:39 2017-01-01 00:00:09 21
2017-01-01 00:17:39 2017-01-01 00:00:10 7
2017-01-01 00:17:39 2017-01-01 00:00:11 2
2017-01-01 00:17:39 2017-01-01 00:00:12 5
2017-01-01 00:17:39 2017-01-01 00:00:13 12
2017-01-01 00:17:39 2017-01-01 00:00:14 17
2017-01-01 00:17:39 2017-01-01 00:00:15 9
2017-01-01 00:17:39 2017-01-01 00:00:16 8

things I don't seem to understand are 
1. when my inputs' first line time stamp is 2017-01-01 00:00:00 why is 2017-01-01 00:17:39 shown up in my result as 
     each sliding window's start time?
    basically, I'm just printing out the time stamp came with the first iterable object's element in MyWindowFunction.
2. I made MyWindowAssigner in a hope that the starting time is fixed and the contents not being purged.
    however, from the results, we can see it works just as a normal EventTimeSlidingWindow with contents
    been purged.
    How can I make it not to throw away its window contents even after each time firing.
3. this MyWindowAssigner(...) attempt arose as an effort based on your previous advice using a 
    different WindowFunction. wonder if I'm heading to the right direction or not.
    
thank you very much!
jad

On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <[hidden email]> wrote:
Ah, I see. The problem is that the watermark has slightly tricky semantics: A watermark T says that there will not be elements with a timestamp <= T in the future. It does not say, that there have not yet been elements with a timestamp > T. In your specific case, this means that there will be elements in the GlobalWindow that have a timestamp that is after the firing timestamp of your trigger. If you want to make sure that windows are somehow put into buckets, based on their timestamp then you need to use a different WindowFunction, because GlobalWindows simply puts every element into the same bucket (window).

Regarding the firing timestamp, it’s currently not possible the get that from within a WindowFunction.

Best,
Aljoscha


On 16. Jul 2017, at 12:16, jad mad <[hidden email]> wrote:

Hello Aljoscha,

thank you very much for your reply. the issue with me is two-fold.
first of all, 
the thing I wanted to achieve was having a GlobalWindows and let it fire 
periodically, say 1 hour or 1 day, and then do some custom calculation.
this custom trigger part I've implemented seems working well.

currently, when every time my custom trigger fires periodically, the elements of iterable object
passed onto my custom WindowFunction contains whole inputs from the start to the end rather than
from start to the timing(event time timestamp) where each time trigger fires.
have been worked on this for a week now but not being able to find any solution yet.

input example. 
2017-07-16 00:00:01, x
2017-07-16 00:00:12, x
2017-07-16 01:03:06, x
2017-07-16 02:20:10, x

In this case, a GlobalWindows with 1-hour periodical trigger, designed to count the cumulative record in MyWindowFunction should emit something like
2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2
2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3
2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4
↑ the start time stamp doesn't change!

now, what I get is like
2017-07-16 00:00:00 ~ , 4
2017-07-16 00:00:00 ~ , 4
2017-07-16 00:00:00 ~ , 4
↑every line the same results...

public class MyWindowFunction<T, W extends Window> implements WindowFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple, W> {

@Override
public void apply(Tuple tuple, W window, Iterable<Tuple2<String, String>> iterable, Collector<Tuple3<String, String, String>> out) throws Exception {

for(Tuple2<String, String> element : iterable)
{
...
}
out.collect(new Tuple3<String, String, String>("...", "...", "..."));
}
}
Secondly, for a GlobalWindows firing periodically, how do you get the periodical firing time stamp inside of
your MyWindowFunction? (the missing ~ part of ending time stamp in above example)

really appreciate the help!
jad


On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that data is never cleared away when using GlobalWindows. Is that not the case?

Best,
Aljoscha

On 14. Jul 2017, at 16:29, jad mad <[hidden email]> wrote:

Hi Aljoscha

thanks for the comment. 
is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;" 
inside of a custom trigger?

gave it a test and the result seems the opposite of what I meant...
instead of throwing away previous windows' contents, I wanna keep them
all the way till the end. 
that way I can get the cumulative counts of all input.

wonder how to achieve it.
anyone?

jad


On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <[hidden email]> wrote:
Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.

You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.

.window(PurgingTrigger.of(<my trigger>))

Best,
Aljoscha

On 13. Jul 2017, at 14:00, jad mad <[hidden email]> wrote:

Hi Prashant,

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

actually I could make my custom trigger to fire periodically.
The problem is the element set stored in the iterable variable 
is always uniform which is not what I'm expecting...

private static class MyWindowFunction_Window...
         ...    
       @Override
        public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
             ...
             for(MyClass element : iterable)

does anyone have any idea on this?
thanks a lot in advance,
jad


On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <[hidden email]> wrote:
Hi

We've have custom operators using global windows and are using event time.

How are you specifying event time as the time characteristic?

Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.








On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <[hidden email]> wrote:
Ah, I see. The problem is that the watermark has slightly tricky semantics: A watermark T says that there will not be elements with a timestamp <= T in the future. It does not say, that there have not yet been elements with a timestamp > T. In your specific case, this means that there will be elements in the GlobalWindow that have a timestamp that is after the firing timestamp of your trigger. If you want to make sure that windows are somehow put into buckets, based on their timestamp then you need to use a different WindowFunction, because GlobalWindows simply puts every element into the same bucket (window).

Regarding the firing timestamp, it’s currently not possible the get that from within a WindowFunction.

Best,
Aljoscha


On 16. Jul 2017, at 12:16, jad mad <[hidden email]> wrote:

Hello Aljoscha,

thank you very much for your reply. the issue with me is two-fold.
first of all, 
the thing I wanted to achieve was having a GlobalWindows and let it fire 
periodically, say 1 hour or 1 day, and then do some custom calculation.
this custom trigger part I've implemented seems working well.

currently, when every time my custom trigger fires periodically, the elements of iterable object
passed onto my custom WindowFunction contains whole inputs from the start to the end rather than
from start to the timing(event time timestamp) where each time trigger fires.
have been worked on this for a week now but not being able to find any solution yet.

input example. 
2017-07-16 00:00:01, x
2017-07-16 00:00:12, x
2017-07-16 01:03:06, x
2017-07-16 02:20:10, x

In this case, a GlobalWindows with 1-hour periodical trigger, designed to count the cumulative record in MyWindowFunction should emit something like
2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2
2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3
2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4
↑ the start time stamp doesn't change!

now, what I get is like
2017-07-16 00:00:00 ~ , 4
2017-07-16 00:00:00 ~ , 4
2017-07-16 00:00:00 ~ , 4
↑every line the same results...

public class MyWindowFunction<T, W extends Window> implements WindowFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple, W> {

@Override
public void apply(Tuple tuple, W window, Iterable<Tuple2<String, String>> iterable, Collector<Tuple3<String, String, String>> out) throws Exception {

for(Tuple2<String, String> element : iterable)
{
...
}
out.collect(new Tuple3<String, String, String>("...", "...", "..."));
}
}
Secondly, for a GlobalWindows firing periodically, how do you get the periodical firing time stamp inside of
your MyWindowFunction? (the missing ~ part of ending time stamp in above example)

really appreciate the help!
jad


On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that data is never cleared away when using GlobalWindows. Is that not the case?

Best,
Aljoscha

On 14. Jul 2017, at 16:29, jad mad <[hidden email]> wrote:

Hi Aljoscha

thanks for the comment. 
is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;" 
inside of a custom trigger?

gave it a test and the result seems the opposite of what I meant...
instead of throwing away previous windows' contents, I wanna keep them
all the way till the end. 
that way I can get the cumulative counts of all input.

wonder how to achieve it.
anyone?

jad


On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <[hidden email]> wrote:
Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.

You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.

.window(PurgingTrigger.of(<my trigger>))

Best,
Aljoscha

On 13. Jul 2017, at 14:00, jad mad <[hidden email]> wrote:

Hi Prashant,

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

actually I could make my custom trigger to fire periodically.
The problem is the element set stored in the iterable variable 
is always uniform which is not what I'm expecting...

private static class MyWindowFunction_Window...
         ...    
       @Override
        public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
             ...
             for(MyClass element : iterable)

does anyone have any idea on this?
thanks a lot in advance,
jad


On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <[hidden email]> wrote:
Hi

We've have custom operators using global windows and are using event time.

How are you specifying event time as the time characteristic?

Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.