i have requirement not to reject events .. even if they are late(after maximum allowedness). So the way i achieve this but overriding Tumbling window class and update event time to last event time if the event is late and for identification attached additional column in db as (currentevent/msPerHour - eventtime/msPerHour)
currentevent is a class level variable in Tumbling window. My problem statement is, in case of full runtime crash, can flink recover tumblewindow state? if yes how? |
Hi,
in general, a class level variable is not managed by Flink if it is not defined as state or the function does not implemented ListCheckpointed interface. Allowing infinite lateness also means that your window content has to be stored infinitely. I'm not sure if I understand your implementation correctly, but overriding classes that where not intended for overriding seems not to be a good solution. With Flink 1.3 there will be side outputs that allow to retrieve all late data as a stream for not losing elements at all: WindowedStream#sideOutputLateData, maybe this might be an option for you. Hope that helps. Timo Am 16.05.17 um 23:25 schrieb rizhashmi: > i have requirement not to reject events .. even if they are late(after > maximum allowedness). So the way i achieve this but overriding Tumbling > window class and update event time to last event time if the event is late > and for identification attached additional column in db as > (currentevent/msPerHour - eventtime/msPerHour) > > currentevent is a class level variable in Tumbling window. > > > My problem statement is, in case of full runtime crash, can flink recover > tumblewindow state? > > if yes how? > > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
This post was updated on .
Thanks Walther,
I am not keeping my window forever.if the event arrived after graced period(lateness) i update event time to current time or say last event time. That essentially solve infinite issue. 1.3 is not stable yet? |
In reply to this post by Timo Walther
Yes .. is there any possibility access flink state variables in WindowAssigner.assignWindows method?
|
Hi,
Why don’t you set the allowed lateness to Long.MAX_VALUE? This way no data will ever be considered late. If you make the trigger via PurgingTrigger.of(EventTimeTrigger.of(…)). You ensure that window state is not kept after a window fires. Best, Aljoscha > On 17. May 2017, at 13:39, rizhashmi <[hidden email]> wrote: > > Yes .. is there any possibility access flink state variables in > WindowAssigner.assignWindows method? > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13196.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Could you elaborate this more? If i assume if i set window time to max .. does it mean my window will stay for infinite time framework,
Wouldn't this may hit memory overflow with time? |
Hi,
If you use tumbling windows or sliding windows then Flink will not keep additional meta data besides the actual window contents. Also, if you use a Trigger that purges when firing Flink will clean the window contents after firing a window. This means that you can set allowed lateness to MAX and always process all late-arriving events without keeping any state. Of course, when processing those late events you will get a window that only contains that event, not the complete window that was already processed and purged before. Best, Aljoscha > On 22. May 2017, at 21:51, rhashmi <[hidden email]> wrote: > > Could you elaborate this more? If i assume if i set window time to max .. > does it mean my window will stay for infinite time framework, > Wouldn't this may hit memory overflow with time? > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13255.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Thanks Aljoscha Krettek,
So the results will not be deterministic for late events. For idempotent update, i would need to find an additional key base of current event time if they are late and attached to the aggregator which probably possible by doing some function(maxEventTime, actualEventTime). For that i need maxEventTime to be stored as part of state & recover in case of runtime failure. Here is my corner case like. -- If assume whole flink runtime crashed(auto commit on) & after recovery the first event arrived is from past(actually late). Without keeping max currentTime state, may potentially override previous aggregate. I was wondering if i can record my last max EventTime as part of checkPoint, or run query against sink source to find last processed eventtime. Any recommendation? |
Hi,
Yes I think you can manually store the latest watermark by using the OperatorStateStore that you get if your user function implements the CheckpointedFunction interface. Best, Aljoscha > On 30. May 2017, at 13:43, rhashmi <[hidden email]> wrote: > > Thanks Aljoscha Krettek, > > So the results will not be deterministic for late events. For idempotent > update, i would need to find an additional key base of current event time if > they are late and attached to the aggregator which probably possible by > doing some function(maxEventTime, actualEventTime). For that i need > maxEventTime to be stored as part of state & recover in case of runtime > failure. > > Here is my corner case like. > -- If assume whole flink runtime crashed(auto commit on) & after recovery > the first event arrived is from past(actually late). Without keeping max > currentTime state, may potentially override previous aggregate. > > I was wondering if i can record my last max EventTime as part of checkPoint, > or run query against sink source to find last processed eventtime. > > Any recommendation? > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13387.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Free forum by Nabble | Edit this page |