An idea for a parallel AllWindowedStream

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

An idea for a parallel AllWindowedStream

Juan Rodríguez Hortalá
Hi,

As a self training exercise I've defined a class extending WindowedStream for implementing a proof of concept for a parallel version of AllWindowStream
/**
* Tries to create a parallel version of a AllWindowStream for a DataStream
* by creating a KeyedStream by using as key the hash of the elements module
* a parallelism level
*
* This only makes sense for window assigners that ensure the subwindows will be
* in sync, like time based window assigners, and it is more stable with ingestion
* and event time because the window alignment is more reliable.
* This doesn't work for counting or sessions window assigners.
*
* Also note elements from different partitions might get out of order due
* to parallelism
* */
public static class ParAllWindowedStream<T,W extends Window> extends WindowedStream<T, Integer, W> {
private final transient WindowAssigner<Object,W> windowAssigner;

public ParAllWindowedStream(DataStream<T> stream, final int parallelism,
WindowAssigner<Object,W> windowAssigner) {
super(stream.keyBy(new KeySelector<T, Integer>() {
@Override
public Integer getKey(T value) throws Exception {
return value.hashCode() % parallelism;
}
}),
windowAssigner);
this.windowAssigner = windowAssigner;
}

@Override
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFun) {
return super.reduce(reduceFun) // reduce each subwindow
.windowAll(windowAssigner) // synchronize
.reduce(reduceFun); // sequential aggregate of
}

// Cannot override because we need an additional reduce function of type R
// to recombine the result for each window
// @Override
public <R> SingleOutputStreamOperator<R> applyPar(ReduceFunction<T> reduceFunction,
WindowFunction<T, R, Integer, W> function,
ReduceFunction<R> reduceWindowsFunction) {
return super.apply(reduceFunction, function)
.windowAll(windowAssigner)
.reduce(reduceWindowsFunction);
}
}
Greetings,

Juan
Reply | Threaded
Open this post in threaded view
|

Re: An idea for a parallel AllWindowedStream

Aljoscha Krettek
Hi,
yes, this works well in cases and I was also thinking about adding something like this to Flink.

There can be problems if you use a trigger other than EventTimeTrigger that possibly fires multiple times or if you specify an allowed lateness. In those cases, you would overcount elements in the all-window.

Cheers,
Aljoscha

On Wed, 9 Nov 2016 at 06:35 Juan Rodríguez Hortalá <[hidden email]> wrote:
Hi,

As a self training exercise I've defined a class extending WindowedStream for implementing a proof of concept for a parallel version of AllWindowStream
/**
* Tries to create a parallel version of a AllWindowStream for a DataStream
* by creating a KeyedStream by using as key the hash of the elements module
* a parallelism level
*
* This only makes sense for window assigners that ensure the subwindows will be
* in sync, like time based window assigners, and it is more stable with ingestion
* and event time because the window alignment is more reliable.
* This doesn't work for counting or sessions window assigners.
*
* Also note elements from different partitions might get out of order due
* to parallelism
* */
public static class ParAllWindowedStream<T,W extends Window> extends WindowedStream<T, Integer, W> {
private final transient WindowAssigner<Object,W> windowAssigner;

public ParAllWindowedStream(DataStream<T> stream, final int parallelism,
WindowAssigner<Object,W> windowAssigner) {
super(stream.keyBy(new KeySelector<T, Integer>() {
@Override
public Integer getKey(T value) throws Exception {
return value.hashCode() % parallelism;
}
}),
windowAssigner);
this.windowAssigner = windowAssigner;
}

@Override
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFun) {
return super.reduce(reduceFun) // reduce each subwindow
.windowAll(windowAssigner) // synchronize
.reduce(reduceFun); // sequential aggregate of
}

// Cannot override because we need an additional reduce function of type R
// to recombine the result for each window
// @Override
public <R> SingleOutputStreamOperator<R> applyPar(ReduceFunction<T> reduceFunction,
WindowFunction<T, R, Integer, W> function,
ReduceFunction<R> reduceWindowsFunction) {
return super.apply(reduceFunction, function)
.windowAll(windowAssigner)
.reduce(reduceWindowsFunction);
}
}
Greetings,

Juan