Access to time in aggregation, or aggregation in ProcessWindowFunction?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Access to time in aggregation, or aggregation in ProcessWindowFunction?

William Saar-2
Hi,
I am looking to implement a window that sends out updates for each new event it receives and also when an expiration timer fires and purges the window (the expiration time can be determined from a timestamp in the first event).

I can't figure out a way to do this that does not require preserving all events in the window. It seems I would either need to be able to check the current watermark when an aggregation or its window function is evaluated to be able to fire the final update when the timer fires, or I would need the WindowProcessFunction (where I do have access to the time) to not preserve all elements in the window.

The only way I've come up with to implement this is to use a WindowProcessFunction that keeps state to only send out updates for new elements in the elements iterable. The WindowProcessFunction then also sends out an update when the first element timestamp meets the expiration condition, or if the elements iterable parameter does not contain any new elements (deducing that the processing must have been triggered by a timer invocation and not a new element). Is there a better way to do this?

Thanks,
William
Reply | Threaded
Open this post in threaded view
|

Re: Access to time in aggregation, or aggregation in ProcessWindowFunction?

Nico Kruber
Hi William,
I'm not quite sure what you are trying to achieve...

What constitutes a "new event"? is this based on some key? If so, you may
group on that key, create a window and use a custom trigger [1]  instead where
you can react in onElement() and setup a event time timer for the first one and
then react in onEventTime for your timeout.
A ProcessFunction [2] (without a window) looks like a better solution though
depending on the details.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
windows.html#triggers
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
process_function.html

On Tuesday, 20 June 2017 12:52:38 CEST William Saar wrote:

> Hi,
> I am looking to implement a window that sends out updates for each new
> event it receives and also when an expiration timer fires and purges
> the window (the expiration time can be determined from a timestamp in
> the first event).
>
> I can't figure out a way to do this that does not require preserving
> all events in the window. It seems I would either need to be able to
> check the current watermark when an aggregation or its window function
> is evaluated to be able to fire the final update when the timer fires,
> or I would need the WindowProcessFunction (where I do have access to
> the time) to not preserve all elements in the window.
>
> The only way I've come up with to implement this is to use a
> WindowProcessFunction that keeps state to only send out updates for
> new elements in the elements iterable. The WindowProcessFunction then
> also sends out an update when the first element timestamp meets the
> expiration condition, or if the elements iterable parameter does not
> contain any new elements (deducing that the processing must have been
> triggered by a timer invocation and not a new element). Is there a
> better way to do this?
>
> Thanks,
> William


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Access to time in aggregation, or aggregation in ProcessWindowFunction?

William Saar-2
Hi,
That looks perfect! I realized I could probably use an Evictor together with my WindowProcessFunction to prevent the window from preserving the whole state, but ditching the window looks even better.

Thanks a lot!

William



----- Original Message -----
From:
"Nico Kruber" <[hidden email]>

To:
<[hidden email]>
Cc:
"William Saar" <[hidden email]>
Sent:
Tue, 20 Jun 2017 18:20:01 +0200
Subject:
Re: Access to time in aggregation, or aggregation in ProcessWindowFunction?


Hi William,
I'm not quite sure what you are trying to achieve...

What constitutes a "new event"? is this based on some key? If so, you may
group on that key, create a window and use a custom trigger [1] instead where
you can react in onElement() and setup a event time timer for the first one and
then react in onEventTime for your timeout.
A ProcessFunction [2] (without a window) looks like a better solution though
depending on the details.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
windows.html#triggers
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
process_function.html

On Tuesday, 20 June 2017 12:52:38 CEST William Saar wrote:
> Hi,
> I am looking to implement a window that sends out updates for each new
> event it receives and also when an expiration timer fires and purges
> the window (the expiration time can be determined from a timestamp in
> the first event).
>
> I can't figure out a way to do this that does not require preserving
> all events in the window. It seems I would either need to be able to
> check the current watermark when an aggregation or its window function
> is evaluated to be able to fire the final update when the timer fires,
> or I would need the WindowProcessFunction (where I do have access to
> the time) to not preserve all elements in the window.
>
> The only way I've come up with to implement this is to use a
> WindowProcessFunction that keeps state to only send out updates for
> new elements in the elements iterable. The WindowProcessFunction then
> also sends out an update when the first element timestamp meets the
> expiration condition, or if the elements iterable parameter does not
> contain any new elements (deducing that the processing must have been
> triggered by a timer invocation and not a new element). Is there a
> better way to do this?
>
> Thanks,
> William