|
Hi, yes, this works well in cases and I was also thinking about adding something like this to Flink.
There can be problems if you use a trigger other than EventTimeTrigger that possibly fires multiple times or if you specify an allowed lateness. In those cases, you would overcount elements in the all-window.
Cheers, Aljoscha On Wed, 9 Nov 2016 at 06:35 Juan Rodríguez Hortalá < [hidden email]> wrote: Hi,
As a self training exercise I've defined a class extending WindowedStream for implementing a proof of concept for a parallel version of AllWindowStream /** * Tries to create a parallel version of a AllWindowStream for a DataStream * by creating a KeyedStream by using as key the hash of the elements module * a parallelism level * * This only makes sense for window assigners that ensure the subwindows will be * in sync, like time based window assigners, and it is more stable with ingestion * and event time because the window alignment is more reliable. * This doesn't work for counting or sessions window assigners. * * Also note elements from different partitions might get out of order due * to parallelism * */ public static class ParAllWindowedStream<T,W extends Window> extends WindowedStream<T, Integer, W> { private final transient WindowAssigner<Object,W> windowAssigner;
public ParAllWindowedStream(DataStream<T> stream, final int parallelism, WindowAssigner<Object,W> windowAssigner) { super(stream.keyBy(new KeySelector<T, Integer>() { @Override public Integer getKey(T value) throws Exception { return value.hashCode() % parallelism; } }), windowAssigner); this.windowAssigner = windowAssigner; }
@Override public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFun) { return super.reduce(reduceFun) // reduce each subwindow .windowAll(windowAssigner) // synchronize .reduce(reduceFun); // sequential aggregate of }
// Cannot override because we need an additional reduce function of type R // to recombine the result for each window // @Override public <R> SingleOutputStreamOperator<R> applyPar(ReduceFunction<T> reduceFunction, WindowFunction<T, R, Integer, W> function, ReduceFunction<R> reduceWindowsFunction) { return super.apply(reduceFunction, function) .windowAll(windowAssigner) .reduce(reduceWindowsFunction); } } Greetings,
Juan
|