Hi,
I'm trying to understand how the lifecycle of messages / state is managed by Flink, but I'm failing to find any documentation. Specially, if I'm using a windowed stream and a type of trigger that retain the elements of the window to allow for processing of late data e.g. ContinousEventTimeTrigger, then where are the contents of the windows, or their intermediate computation results, stored, and when is the data removed? I'm thinking in terms of Google's Dataflow API, setting a windows the withAllowedLateness option allows the caller to control how long past the end of a window the data should be maintained. Does Flink have anything similar? Thanks, Andy |
Hi,
the window contents are stored in state managed by the window operator at all times until they are purged by a Trigger returning PURGE from one of its on*() methods. Out of the box, Flink does not have something akin to the lateness and cleanup of Google Dataflow. You can, however implement it yourself using a custom Trigger. This is an example that mimics Google Dataflow: public class EventTimeTrigger implements Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final boolean accumulating; private final long allowedLateness; private EventTimeTrigger(boolean accumulating, long allowedLateness) { this.accumulating = accumulating; this.allowedLateness = allowedLateness; } @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { if (time == window.maxTimestamp()) { if (accumulating) { // register the cleanup timer if we are accumulating (and allow lateness) if (allowedLateness > 0) { ctx.registerEventTimeTimer(window.maxTimestamp() + allowedLateness); } return TriggerResult.FIRE; } else { return TriggerResult.FIRE_AND_PURGE; } } else if (time == window.maxTimestamp() + allowedLateness) { return TriggerResult.PURGE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public String toString() { return "EventTimeTrigger()"; } /** * Creates an event-time trigger that fires once the watermark passes the end of the window. * * <p> * Once the trigger fires all elements are discarded. Elements that arrive late immediately * trigger window evaluation with just this one element. */ public static EventTimeTrigger discarding() { return new EventTimeTrigger(false, 0L); } /** * Creates an event-time trigger that fires once the watermark passes the end of the window. * * <p> * This trigger will not immediately discard all elements once it fires. Only after the * watermark passes the specified lateness are the window elements discarded, without * emitting a new result. If a late element arrives within the specified lateness * the window is computed again and a new result is emitted. */ public static EventTimeTrigger accumulating(AbstractTime allowedLateness) { return new EventTimeTrigger(true, allowedLateness.toMilliseconds()); } } You can specify a lateness and while that time is not yet reached the windows will remain and late arriving elements will trigger window emission with the complete window contents. Cheers, Aljoscha > On 13 Jan 2016, at 15:12, Andrew Coates <[hidden email]> wrote: > > Hi, > > I'm trying to understand how the lifecycle of messages / state is managed by Flink, but I'm failing to find any documentation. > > Specially, if I'm using a windowed stream and a type of trigger that retain the elements of the window to allow for processing of late data e.g. ContinousEventTimeTrigger, then where are the contents of the windows, or their intermediate computation results, stored, and when is the data removed? > > I'm thinking in terms of Google's Dataflow API, setting a windows the withAllowedLateness option allows the caller to control how long past the end of a window the data should be maintained. Does Flink have anything similar? > > Thanks, > > Andy |
Thanks Aljoscha, that's very enlightening. Can you please also explain what the default behaviour is? I.e. if I use one if the accumulating inbuilt triggers, when does the state get purged? (With your info I can now probably work things out, but you may give more insight :) Also, are there plans to add explicit lateness control to flink core? (I'm aware off the dataflow integration work ) Thanks again, Andy On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <[hidden email]> wrote: Hi, |
Hi,
I imagine you are taking about CountTrigger, DeltaTrigger, and Continuous*Trigger. For these we never purge. They are a leftover artifact from an earlier approach to implementing windowing strategies that was inspired by IBM InfoSphere streams. Here, all triggers are essentially accumulating and elements are evicted by an evictor. This is very flexible but makes it hard to implement windowing code efficiently. If you are interested here is a Master Thesis that describes that earlier implementation: http://www.diva-portal.se/smash/get/diva2:861798/FULLTEXT01.pdf These triggers are problematic because they never purge window contents if you don’t have an evictor that does correct eviction. Also, they don’t allow incremental aggregation over elements as they arrive since you don’t know what will be the contents of the window until the trigger fires and the evictor evicts. So, as a short answer: the accumulating triggers never purge window state on their own. I hope this helps somehow. Cheers, Aljoscha > On 15 Jan 2016, at 09:55, Andrew Coates <[hidden email]> wrote: > > Thanks Aljoscha, that's very enlightening. > > Can you please also explain what the default behaviour is? I.e. if I use one if the accumulating inbuilt triggers, when does the state get purged? (With your info I can now probably work things out, but you may give more insight :) > > Also, are there plans to add explicit lateness control to flink core? (I'm aware off the dataflow integration work ) > > Thanks again, > > Andy > > > On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <[hidden email]> wrote: > Hi, > the window contents are stored in state managed by the window operator at all times until they are purged by a Trigger returning PURGE from one of its on*() methods. > > Out of the box, Flink does not have something akin to the lateness and cleanup of Google Dataflow. You can, however implement it yourself using a custom Trigger. This is an example that mimics Google Dataflow: > > public class EventTimeTrigger implements Trigger<Object, TimeWindow> { > private static final long serialVersionUID = 1L; > > private final boolean accumulating; > private final long allowedLateness; > > private EventTimeTrigger(boolean accumulating, long allowedLateness) { > this.accumulating = accumulating; > this.allowedLateness = allowedLateness; > } > > @Override > public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { > ctx.registerEventTimeTimer(window.maxTimestamp()); > return TriggerResult.CONTINUE; > } > > @Override > public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { > if (time == window.maxTimestamp()) { > if (accumulating) { > // register the cleanup timer if we are accumulating (and allow lateness) > if (allowedLateness > 0) { > ctx.registerEventTimeTimer(window.maxTimestamp() + allowedLateness); > } > return TriggerResult.FIRE; > } else { > return TriggerResult.FIRE_AND_PURGE; > } > } else if (time == window.maxTimestamp() + allowedLateness) { > return TriggerResult.PURGE; > } > > return TriggerResult.CONTINUE; > } > > @Override > public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { > return TriggerResult.CONTINUE; > } > > @Override > public String toString() { > return "EventTimeTrigger()"; > } > > /** > * Creates an event-time trigger that fires once the watermark passes the end of the window. > * > * <p> > * Once the trigger fires all elements are discarded. Elements that arrive late immediately > * trigger window evaluation with just this one element. > */ > public static EventTimeTrigger discarding() { > return new EventTimeTrigger(false, 0L); > } > > /** > * Creates an event-time trigger that fires once the watermark passes the end of the window. > * > * <p> > * This trigger will not immediately discard all elements once it fires. Only after the > * watermark passes the specified lateness are the window elements discarded, without > * emitting a new result. If a late element arrives within the specified lateness > * the window is computed again and a new result is emitted. > */ > public static EventTimeTrigger accumulating(AbstractTime allowedLateness) { > return new EventTimeTrigger(true, allowedLateness.toMilliseconds()); > } > } > > You can specify a lateness and while that time is not yet reached the windows will remain and late arriving elements will trigger window emission with the complete window contents. > > Cheers, > Aljoscha > > On 13 Jan 2016, at 15:12, Andrew Coates <[hidden email]> wrote: > > > > Hi, > > > > I'm trying to understand how the lifecycle of messages / state is managed by Flink, but I'm failing to find any documentation. > > > > Specially, if I'm using a windowed stream and a type of trigger that retain the elements of the window to allow for processing of late data e.g. ContinousEventTimeTrigger, then where are the contents of the windows, or their intermediate computation results, stored, and when is the data removed? > > > > I'm thinking in terms of Google's Dataflow API, setting a windows the withAllowedLateness option allows the caller to control how long past the end of a window the data should be maintained. Does Flink have anything similar? > > > > Thanks, > > > > Andy > |
Hi Aljoscha, Thanks for the info! Andy On Fri, 15 Jan 2016 at 10:12 Aljoscha Krettek <[hidden email]> wrote: Hi, |
Hi Aljoscha, Just thinking on the EventTimeTrigger example you provided, and I'm going to apologise in advance for taking more of your time!, but I'm thinking that should I go down that route any long allowedLateness, we'll run into issues with memory use, unless Flink is smart enough, configurable enough, or customisable enough to allow where the ageing state is kept. Thoughts? Thanks! Andy On Fri, 15 Jan 2016 at 15:51 Andrew Coates <[hidden email]> wrote:
|
Hi,
don’t worry, it’s good to get questions about this stuff. :D You are right, if Flink is not clever about the state your JVMs can run out of memory and blow up. We are currently working on several things that should make this more robust: 1) Put Flink Windows on Flink’s partitioned state abstraction (for this it needs to be enhanced a bit) 2) Provide more State Backends Having 1) and 2) allows choosing different state backends for the window operations without changing the program. For example, there is a state backend that stores state in-memory, I’m working on a state backend that stores state in RocksDB (on-disk), Gyula Fóra is working on s state backend that stores state in HDFS TFiles (if I’m not mistaken) and he also previously contributed the DB state backend that can store state in a SQL data base. Cheers, Aljoscha On 15 Jan 2016, at 16:56, Andrew Coates <[hidden email]> wrote: > > > Hi Aljoscha, > > Just thinking on the EventTimeTrigger example you provided, and I'm going to apologise in advance for taking more of your time!, but I'm thinking that should I go down that route any long allowedLateness, we'll run into issues with memory use, unless Flink is smart enough, configurable enough, or customisable enough to allow where the ageing state is kept. > > Thoughts? > > Thanks! > > Andy > > On Fri, 15 Jan 2016 at 15:51 Andrew Coates <[hidden email]> wrote: > Hi Aljoscha, > > Thanks for the info! > > Andy > > On Fri, 15 Jan 2016 at 10:12 Aljoscha Krettek <[hidden email]> wrote: > Hi, > I imagine you are taking about CountTrigger, DeltaTrigger, and Continuous*Trigger. For these we never purge. They are a leftover artifact from an earlier approach to implementing windowing strategies that was inspired by IBM InfoSphere streams. Here, all triggers are essentially accumulating and elements are evicted by an evictor. This is very flexible but makes it hard to implement windowing code efficiently. If you are interested here is a Master Thesis that describes that earlier implementation: http://www.diva-portal.se/smash/get/diva2:861798/FULLTEXT01.pdf > > These triggers are problematic because they never purge window contents if you don’t have an evictor that does correct eviction. Also, they don’t allow incremental aggregation over elements as they arrive since you don’t know what will be the contents of the window until the trigger fires and the evictor evicts. > > So, as a short answer: the accumulating triggers never purge window state on their own. I hope this helps somehow. > > Cheers, > Aljoscha > > On 15 Jan 2016, at 09:55, Andrew Coates <[hidden email]> wrote: > > > > Thanks Aljoscha, that's very enlightening. > > > > Can you please also explain what the default behaviour is? I.e. if I use one if the accumulating inbuilt triggers, when does the state get purged? (With your info I can now probably work things out, but you may give more insight :) > > > > Also, are there plans to add explicit lateness control to flink core? (I'm aware off the dataflow integration work ) > > > > Thanks again, > > > > Andy > > > > > > On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <[hidden email]> wrote: > > Hi, > > the window contents are stored in state managed by the window operator at all times until they are purged by a Trigger returning PURGE from one of its on*() methods. > > > > Out of the box, Flink does not have something akin to the lateness and cleanup of Google Dataflow. You can, however implement it yourself using a custom Trigger. This is an example that mimics Google Dataflow: > > > > public class EventTimeTrigger implements Trigger<Object, TimeWindow> { > > private static final long serialVersionUID = 1L; > > > > private final boolean accumulating; > > private final long allowedLateness; > > > > private EventTimeTrigger(boolean accumulating, long allowedLateness) { > > this.accumulating = accumulating; > > this.allowedLateness = allowedLateness; > > } > > > > @Override > > public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { > > ctx.registerEventTimeTimer(window.maxTimestamp()); > > return TriggerResult.CONTINUE; > > } > > > > @Override > > public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { > > if (time == window.maxTimestamp()) { > > if (accumulating) { > > // register the cleanup timer if we are accumulating (and allow lateness) > > if (allowedLateness > 0) { > > ctx.registerEventTimeTimer(window.maxTimestamp() + allowedLateness); > > } > > return TriggerResult.FIRE; > > } else { > > return TriggerResult.FIRE_AND_PURGE; > > } > > } else if (time == window.maxTimestamp() + allowedLateness) { > > return TriggerResult.PURGE; > > } > > > > return TriggerResult.CONTINUE; > > } > > > > @Override > > public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { > > return TriggerResult.CONTINUE; > > } > > > > @Override > > public String toString() { > > return "EventTimeTrigger()"; > > } > > > > /** > > * Creates an event-time trigger that fires once the watermark passes the end of the window. > > * > > * <p> > > * Once the trigger fires all elements are discarded. Elements that arrive late immediately > > * trigger window evaluation with just this one element. > > */ > > public static EventTimeTrigger discarding() { > > return new EventTimeTrigger(false, 0L); > > } > > > > /** > > * Creates an event-time trigger that fires once the watermark passes the end of the window. > > * > > * <p> > > * This trigger will not immediately discard all elements once it fires. Only after the > > * watermark passes the specified lateness are the window elements discarded, without > > * emitting a new result. If a late element arrives within the specified lateness > > * the window is computed again and a new result is emitted. > > */ > > public static EventTimeTrigger accumulating(AbstractTime allowedLateness) { > > return new EventTimeTrigger(true, allowedLateness.toMilliseconds()); > > } > > } > > > > You can specify a lateness and while that time is not yet reached the windows will remain and late arriving elements will trigger window emission with the complete window contents. > > > > Cheers, > > Aljoscha > > > On 13 Jan 2016, at 15:12, Andrew Coates <[hidden email]> wrote: > > > > > > Hi, > > > > > > I'm trying to understand how the lifecycle of messages / state is managed by Flink, but I'm failing to find any documentation. > > > > > > Specially, if I'm using a windowed stream and a type of trigger that retain the elements of the window to allow for processing of late data e.g. ContinousEventTimeTrigger, then where are the contents of the windows, or their intermediate computation results, stored, and when is the data removed? > > > > > > I'm thinking in terms of Google's Dataflow API, setting a windows the withAllowedLateness option allows the caller to control how long past the end of a window the data should be maintained. Does Flink have anything similar? > > > > > > Thanks, > > > > > > Andy > > > |
Fantastic. Sounds like things are moving in the right direction. I'm hoping this will be tiered storage. Thanks! On Fri, 15 Jan 2016, 17:04 Aljoscha Krettek <[hidden email]> wrote: Hi, |
Free forum by Nabble | Edit this page |