State in Custom Tumble Window Class

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

State in Custom Tumble Window Class

rhashmi
i have requirement not to reject events .. even if they are late(after maximum allowedness). So the way i achieve this but overriding Tumbling window class and update event time to last event time if the event is late and for identification attached additional column in db as (currentevent/msPerHour - eventtime/msPerHour)

currentevent is a class level variable in Tumbling window.


My problem statement is, in case of full runtime crash, can flink recover tumblewindow state?

if yes how?

Reply | Threaded
Open this post in threaded view
|

Re: State in Custom Tumble Window Class

Timo Walther
Hi,

in general, a class level variable is not managed by Flink if it is not
defined as state or the function does not implemented ListCheckpointed
interface. Allowing infinite lateness also means that your window
content has to be stored infinitely. I'm not sure if I understand your
implementation correctly, but overriding classes that where not intended
for overriding seems not to be a good solution. With Flink 1.3 there
will be side outputs that allow to retrieve all late data as a stream
for not losing elements at all: WindowedStream#sideOutputLateData, maybe
this might be an option for you.

Hope that helps.

Timo


Am 16.05.17 um 23:25 schrieb rizhashmi:

> i have requirement not to reject events .. even if they are late(after
> maximum allowedness). So the way i achieve this but overriding Tumbling
> window class and update event time to last event time if the event is late
> and for identification attached additional column in db as
> (currentevent/msPerHour - eventtime/msPerHour)
>
> currentevent is a class level variable in Tumbling window.
>
>
> My problem statement is, in case of full runtime crash, can flink recover
> tumblewindow state?
>
> if yes how?
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: State in Custom Tumble Window Class

rhashmi
This post was updated on .
Thanks Walther,

I am not keeping my window forever.if the event arrived after graced period(lateness) i update event time to current time or say last event time. That essentially solve infinite issue.

1.3 is not stable yet?
Reply | Threaded
Open this post in threaded view
|

Re: State in Custom Tumble Window Class

rhashmi
In reply to this post by Timo Walther
Yes .. is there any possibility access flink state variables in WindowAssigner.assignWindows method?
Reply | Threaded
Open this post in threaded view
|

Re: State in Custom Tumble Window Class

Aljoscha Krettek
Hi,

Why don’t you set the allowed lateness to Long.MAX_VALUE? This way no data will ever be considered late. If you make the trigger via PurgingTrigger.of(EventTimeTrigger.of(…)). You ensure that window state is not kept after a window fires.

Best,
Aljoscha

> On 17. May 2017, at 13:39, rizhashmi <[hidden email]> wrote:
>
> Yes .. is there any possibility access flink state variables in
> WindowAssigner.assignWindows method?
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13196.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: State in Custom Tumble Window Class

rhashmi
Could you elaborate this more? If i assume if i set window time to max .. does it mean my window will stay for infinite time framework,
Wouldn't this may hit memory overflow with time?
Reply | Threaded
Open this post in threaded view
|

Re: State in Custom Tumble Window Class

Aljoscha Krettek
Hi,

If you use tumbling windows or sliding windows then Flink will not keep additional meta data besides the actual window contents. Also, if you use a Trigger that purges when firing Flink will clean the window contents after firing a window. This means that you can set allowed lateness to MAX and always process all late-arriving events without keeping any state. Of course, when processing those late events you will get a window that only contains that event, not the complete window that was already processed and purged before.

Best,
Aljoscha

> On 22. May 2017, at 21:51, rhashmi <[hidden email]> wrote:
>
> Could you elaborate this more? If i assume if i set window time to max ..
> does it mean my window will stay for infinite time framework,
> Wouldn't this may hit memory overflow with time?
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13255.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: State in Custom Tumble Window Class

rhashmi
Thanks Aljoscha Krettek,

So the results will not be deterministic for late events. For idempotent update, i would need to find an additional key base of current event time if they are late and attached to the aggregator which probably possible by doing some function(maxEventTime, actualEventTime). For that i need maxEventTime to be stored as part of state & recover in case of runtime failure.

Here is my corner case like.
-- If assume whole flink runtime crashed(auto commit on) & after recovery the first event arrived is from past(actually late). Without keeping max currentTime state, may potentially override previous aggregate.

I was wondering if i can record my last max EventTime as part of checkPoint,
or run query against sink source to find last processed eventtime.

Any recommendation?
 
Reply | Threaded
Open this post in threaded view
|

Re: State in Custom Tumble Window Class

Aljoscha Krettek
Hi,
Yes I think you can manually store the latest watermark by using the OperatorStateStore that you get if your user function implements the CheckpointedFunction interface.

Best,
Aljoscha

> On 30. May 2017, at 13:43, rhashmi <[hidden email]> wrote:
>
> Thanks Aljoscha Krettek,
>
> So the results will not be deterministic for late events. For idempotent
> update, i would need to find an additional key base of current event time if
> they are late and attached to the aggregator which probably possible by
> doing some function(maxEventTime, actualEventTime). For that i need
> maxEventTime to be stored as part of state & recover in case of runtime
> failure.
>
> Here is my corner case like.
> -- If assume whole flink runtime crashed(auto commit on) & after recovery
> the first event arrived is from past(actually late). Without keeping max
> currentTime state, may potentially override previous aggregate.
>
> I was wondering if i can record my last max EventTime as part of checkPoint,
> or run query against sink source to find last processed eventtime.
>
> Any recommendation?
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13387.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.