Updating elements of a window in regular intervals

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Updating elements of a window in regular intervals

Anton W. Haubner

Hello!

I hope this is the correct mailing list for newb questions regarding flink stream processing.

Essentially, I have a question about how to apply a transformation to each individual element of a sliding window in regular intervals.


I think a little background to the problem I'm trying to solve could be helpful before asking the concrete question:
I have a service A which continuously produces events, and another service B which accepts collections of processed events.

The collections accepted by the latter service are produced from the events received within the last minute. So what i am currently doing is using timeWindowAll to buffer a sliding window of 1 minute size and a aggregate function which produces arrays of events from the windows. These arrays are then sent to the consumer service:

<event source A>
...
.
timeWindowAll(Time.seconds(60), Time.seconds(1))
.aggregate(<collect events of window into an array>)
.addSink(<send to service B>)

This works, but i need to add another functionality: Before being sent off to the consumer service B, all events have to be annotated with a value which needs to be computed based on the time that passed since the event was produced. What I am currently doing is, is applying a map function to the stream of produced arrays. This seems awfully inefficient to me, since each call of the map function has to work on the whole content of a window (now contained in the array):

<event source A>
...
.
timeWindowAll(Time.seconds(60), Time.seconds(1))
.aggregate(<collect events of window into an array>)
.map(<iterate over array, producing a new one with modified events>)
.addSink(<send to service B>)

Instead, i wonder if it was possible to apply a map function to the elements of a window. As I understand it, this is not currently possible (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filtering-and-mapping-data-after-window-opertator-td21449.html#a21458).

Another idea would be, to add keyed windows before the timeWindowAll using the event timestamp value modulo parallelism as keys and perform the event transformation while aggregating each window into an array. Then the computation could be performed in parallel on these smaller windows and afterwards I join the produced arrays:

...
.keyBy(event -> event.timestamp % parallelism)
.timeWindow(Time.seconds(60), Time.seconds(1))
.reduce(<apply mapping and combine into array>)
.timeWindowAll(
Time.seconds(60), Time.seconds(1))
.aggregate(<join arrays of last minute>)
.addSink(<consumer service B>)

What do you think of this idea? Is there a better way to handle this?

Thank you for your help.
Best regards,

Anton


OpenPGP_signature (855 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Updating elements of a window in regular intervals

Timo Walther
Hi Anton,

in many scenarios, it might be better just use a ProcessFunction because
you might reach the limit of the built-in window functions very quickly.
ProcessFunction gives you full flexibility and you can put into state
what you like and set/fire timers when you think the time is appropriate.

In general, timeWindowAll is not a good idea because there is no way of
parallelizing your stream and the window will be executed on one node.
Introducing artificial but stable keys is one way to distribute the
load. But you could also use rebalance() for this purpuse.

However, I don't understand the `timestamp % parallelism` logic.

Regards,
Timo


On 15.01.21 11:53, Anton W. Haubner wrote:

> Hello!
>
> I hope this is the correct mailing list for newb questions regarding
> flink stream processing.
>
> Essentially, I have a question about how to apply a transformation to
> each individual element of a sliding window in regular intervals.
>
>
> I think a little background to the problem I'm trying to solve could be
> helpful before asking the concrete question:
> I have a service *A* which continuously produces events, and another
> service *B* which accepts collections of processed events.
>
> The collections accepted by the latter service are produced from the
> events received within the last minute. So what i am currently doing is
> using timeWindowAll to buffer a sliding window of 1 minute size and a
> aggregate function which produces arrays of events from the windows.
> These arrays are then sent to the consumer service:
>
> <event source A>
> ...
> .timeWindowAll(Time.seconds(60), Time.seconds(1))
> .aggregate(<collect events of window into an array>)
> .addSink(<send to service B>)
>
> /This works,/ but i need to add another functionality: Before being sent
> off to the consumer service *B*, all events have to be annotated with a
> value which needs to be computed based on the time that passed since the
> event was produced. What I am currently doing is, is applying a map
> function to the stream of produced arrays. This seems awfully
> inefficient to me, since each call of the map function has to work on
> the whole content of a window (now contained in the array):
>
> <event source A>
> ...
> .timeWindowAll(Time.seconds(60), Time.seconds(1))
> .aggregate(<collect events of window into an array>)
> .map(<iterate over array, producing a new one with modified events>)
> .addSink(<send to service B>)
>
> Instead, i wonder if it was possible to apply a map function to the
> elements of a window. As I understand it, this is not currently possible
> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filtering-and-mapping-data-after-window-opertator-td21449.html#a21458).
>
> Another idea would be, to add keyed windows before the timeWindowAll
> using the event timestamp value modulo parallelism as keys and perform
> the event transformation while aggregating each window into an array.
> Then the computation could be performed in parallel on these smaller
> windows and afterwards I join the produced arrays:
>
> ...
> .keyBy(event -> event.timestamp % parallelism)
> .timeWindow(Time.seconds(60), Time.seconds(1))
> .reduce(<apply mapping and combine into array>)
> .timeWindowAll(Time.seconds(60), Time.seconds(1))
> .aggregate(<join arrays of last minute>)
> .addSink(<consumer service B>)
>
> What do you think of this idea? Is there a better way to handle this?
>
> Thank you for your help.
> Best regards,
>
> Anton
>