apply with fold- and window function

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

apply with fold- and window function

kaelumania
Hello,

I wondered if there is a particular reason for the window function to have explicitly the same input/output type?
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function)
for example (the following does not work):
    DataStream<SensorAggregatedValue> aggregates = values
.assignTimestampsAndWatermarks(new SensorTimestampsAndWatermarks())
.keyBy("id")
.timeWindow(Time.minutes(1))
.apply(new SensorValueAccumulator(), new AccumulateSensorValues(), new AggregateSensorValues());
because in this case my accumulator object does not have any id or timestamp information - just count, sum, min, max etc. And finally in the window function I receive the key (sensorId) and time window (start/end) and can build an aggregated value with all information needed. But currently the apply function forces me to use one cluttered class with id, count, sum, …, where the id,start,end time are invalid during the fold function.

kind regards,
Stephan
Reply | Threaded
Open this post in threaded view
|

Re: apply with fold- and window function

Aljoscha Krettek
Hi,

I'm still hoping that we can get a workaround in for Flink 1.2. See my last comment in the Jira Issue.

Cheers,
Aljoscha

On Mon, 14 Nov 2016 at 14:49 Stephan Epping <[hidden email]> wrote:
Hello,

I wondered if there is a particular reason for the window function to have explicitly the same input/output type?
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function)
for example (the following does not work):
    DataStream<SensorAggregatedValue> aggregates = values
.assignTimestampsAndWatermarks(new SensorTimestampsAndWatermarks())
.keyBy("id")
.timeWindow(Time.minutes(1))
.apply(new SensorValueAccumulator(), new AccumulateSensorValues(), new AggregateSensorValues());
because in this case my accumulator object does not have any id or timestamp information - just count, sum, min, max etc. And finally in the window function I receive the key (sensorId) and time window (start/end) and can build an aggregated value with all information needed. But currently the apply function forces me to use one cluttered class with id, count, sum, …, where the id,start,end time are invalid during the fold function.

kind regards,
Stephan
Reply | Threaded
Open this post in threaded view
|

Re: apply with fold- and window function

Anchit Jatana
In reply to this post by kaelumania
Hi Stephan,

I faced the similar issue, the way implemented this(though a workaround) is by making the input to fold function i.e. the initial value to fold symmetric to what goes into the window function.

I made the initial value to fold function a tuple with all non required/available index values in that initial tuple as 'null'.

The idea was to have a consistent tuple pass onto both functions and let individual functions operate/update the index of their choice in the tuple.

The last tuple i.e. returned tuple after both these operations would have all the index values set up.

val DEFAULT_ACCUMULATOR_VALUE  = (null, List[String]())

.apply(DEFAULT_ACCUMULATOR_VALUE,
         new MyFoldFunction(),                    // This operates/folds values in index 1 i.e. the list
         new MyWindowFunction())              // This simply puts in the key at index 0

In the end I achieve the aggregation task through fold function + element processing through window function.

Hope this helps!

Regards,
Anchit