Associative operation + windowAll - possible parallelism

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Associative operation + windowAll - possible parallelism

Debski
Let us assume that I want to perform some kind of aggregation in specified time windows (e.g. tumbling window of 1 minute) and my aggregation operation is associative. Wouldn't it be possible to represent windowAll in runtime as parallelism + 1 operator instances where parallelism number of operators compute partial aggregates and then partial results are merged into one in the last instance of the operator by using merge function that is present in AggregateFunction function.

Basically I would like to compute single aggregated value for all events in given time window and aggregation operation itself can be parallelized.

For example i could have mapped stream with .map operation that has parallelism 4, then each map operator instance would pass 1/4 of events to adjacent instance of windowAll operator that would compute desired aggregate over subset of events. When the window is closed all partial states would be transferred to single windowAll merging operator.

Are there any plans to support such situations/is it possible to somehow implement such operator in current version of Flink.

Also there is a note in windowAll java-doc about possible parallelism but I don't know how relevant it is to my case:

Note: This operation can be inherently non-parallel since all elements have to pass through the same operator instance. (Only for special cases, such as aligned time windows is it possible to perform this operation in parallel).
Reply | Threaded
Open this post in threaded view
|

Re: Associative operation + windowAll - possible parallelism

Aljoscha Krettek
Yes, your observations are correct!

Currently, I see two possible solutions that you could implement as a user:

1. Use .window() with a dummy key followed by a .windowAll():

DataStream<T> input = …;
input
  .map( (in) -> new Tuple2<Integer, T>(<rand int>, in))
  .keyBy(0)
  .window(…)
  .aggregate(...)
  .windowAll(…)
  .aggregate(…)

The problem here is that you incur a shuffle, which may or may not improve performance, depending on the aggregation operation.

To get around that shuffle you would have to use option 2.

2. Use a custom StreamOperator that does the pre-aggregation and emits results on event-time, followed by .windowAll();

DataStream<T> input = …;
input
  .transform(<name>, <type info>, new PreAggregationOperator())
  .windowAll(…)
  .aggregate(…)

Where PreAggregationOperator would pre-aggregate and checkpoint the pre-aggregated values and emit the pre-aggregate when the watermark for the end of a window arrives. The reason for why you have to use a custom operator is that a user function cannot “listen” on the watermark and therefore would not be able to emit the aggregate at the right time.

I hope this helps.

Best,
Aljoscha

> On 11. Jul 2017, at 22:05, Debski <[hidden email]> wrote:
>
> Let us assume that I want to perform some kind of aggregation in specified
> time windows (e.g. tumbling window of 1 minute) and my aggregation operation
> is associative. Wouldn't it be possible to represent windowAll in runtime as
> /parallelism + 1/ operator instances where /parallelism/ number of operators
> compute partial aggregates and then partial results are merged into one in
> the last instance of the operator by using merge function that is present in
> AggregateFunction function.
>
> Basically I would like to compute single aggregated value for all events in
> given time window and aggregation operation itself can be parallelized.
>
> For example i could have mapped stream with .map operation that has
> parallelism 4, then each map operator instance would pass 1/4 of events to
> adjacent instance of windowAll operator that would compute desired aggregate
> over subset of events. When the window is closed all partial states would be
> transferred to single windowAll merging operator.
>
> Are there any plans to support such situations/is it possible to somehow
> implement such operator in current version of Flink.
>
> Also there is a note in windowAll java-doc about possible parallelism but I
> don't know how relevant it is to my case:
>
> Note: This operation can be inherently non-parallel since all elements have
> to pass through the same operator instance. (Only for special cases, such as
> aligned time windows is it possible to perform this operation in parallel).
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Associative-operation-windowAll-possible-parallelism-tp14187.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Associative operation + windowAll - possible parallelism

Debski
In reply to this post by Debski
Thanks for the suggestions, I will take a look at transform function.