for a global window with
a custom event time trigger that fires every 1 minute and then apply a custom window function to it, the trigger firing seems working but the element collection i get inside of my custom WindowFunction is always the whole inputs from start to end rather than inputs subset from start to the every 1min window end(maxTimestamp). is this because GlobalWindows is a processing time operator that does not work with event time? thanks a lot, |
Hi
We've have custom operators using global windows and are using event time. How are you specifying event time as the time characteristic? Prashant |
Hi Prashant, env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); actually I could make my custom trigger to fire periodically. The problem is the element set stored in the iterable variable is always uniform which is not what I'm expecting... private static class MyWindowFunction_Window... ... @Override public void apply(Tuple tuple, W window, Iterable<MyClass> iterable, ... for(MyClass element : iterable) does anyone have any idea on this? thanks a lot in advance, jad On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <[hidden email]> wrote: Hi |
Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.
You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e. .window(PurgingTrigger.of(<my trigger>)) Best, Aljoscha
|
Hi Aljoscha thanks for the comment. is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;" inside of a custom trigger? gave it a test and the result seems the opposite of what I meant... instead of throwing away previous windows' contents, I wanna keep them all the way till the end. that way I can get the cumulative counts of all input. wonder how to achieve it. anyone? jad On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <[hidden email]> wrote:
|
Hi,
Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that data is never cleared away when using GlobalWindows. Is that not the case? Best, Aljoscha
|
Hi,
I’m afraid this will not work well because a WindowAssigner should be stateless, i.e. it should not keep any state in fields. The reason is that there can be several WindowAssigners used on the different partitions and the order in which a WindowAssigner sees the incoming elements is also not guaranteed. That is, you might set a timestamp in the “first_timestamp” field that is not chronologically the “first timestamp”. The reason for your windows being purged is probably the allowed lateness, which is zero by default. When the watermark passes the end of a window plus the allowed lateness the window contents are being purged. You can configure the allowed lateness via WindowedStream.allowedLateness(). You should be careful, though, because of you set this too high you might never clean up your window state and therefore have ever growing state. Best, Aljoscha
|
Hi,
Yes, you can have state in a WindowFunction if you use Flink’s state abstraction that you can access from a RichWindowFunction using the RuntimeContext. (Or by using a ProcessWindowFunction). Trigger purging behaviour makes a difference if the Trigger fires repeatedly before the watermark reaches the end of the window. For example a trigger that speculatively fires early. In those cases it can make sense to make a distinction between purging and firing and just firing, depending on whether you want all accumulated window contents or only those elements that have accumulated since the last trigger firing. GlobalWindows is not implemented by setting allowed lateness very high, it is a WindowAssigner that assigns Long.MAX_VALUE to the max window timestamp, the watermark will therefore never pass the end of that GlobalWindow. Regarding your use case: since you want to keep all data since the start I would suggest to use GlobalWindows, a custom Trigger that periodically fires and a ProcessWindwoFunction. In the ProcessWindowFunction you can make sure to only process those elements that you want to process based on their timestamp and the current event time, which you can access from a ProcessWindowFunction. If you don’t want to keep all events indefinitely (which could eventually blow up your state size) you can use an Evictor to sometimes evict certain events from the window buffers. Best, Aljoscha
|
Free forum by Nabble | Edit this page |