I think you would need something like this:
var hourlyDiscarding = stream .window(1.hour) .trigger(discarding) .apply(..)
//write to cassandra hourlyDiscarding .window(1.hour) .trigger(accumulating) .apply(..) .addSink(cassandra)
//forward to next acc step var daily = hourlyDiscarding .window(1.day) .trigger(accumulating) .apply(…)
//write to cassandra daily.addSink(cassandra)
The decision between accumulating/discarding happens at the point where the window is defined, not downstream (this is the same as in Beam).
Sounds good to me. But I still need to have some kind of side output (cassandra) that stores the accumulating aggregates on each time scale (minute, hour). Thus I would need to have something like this
var hourly = stream.window(1.hour).apply(..) //write to cassandra hourly.trigger(accumulating).addSink(cassandra) //forward to next acc step var daily = hourly.trigger(discarding).window(1.day).apply(…) //write to cassandra daily.trigger(accumulating).addSink(cassandra)
Would this be possible? On 23 Nov 2016, at 11:16, Aljoscha Krettek [via Apache Flink User Mailing List archive.] < [hidden email]> wrote:
You can implement discarding behaviour by writing a custom trigger (based on EventTimeTrigger) that returns FIRE_AND_PURGE when firing. With this you could maybe implement a cascade of windows where the first aggregates for the smallest time interval and is discarding and where the other triggers take these "pre-aggregated" values and accumulate.
On Tue, 22 Nov 2016 at 08:11 Stephan Epping <<a href="x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=0" target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
Hey Aljoscha,
the first solution did not work out as expected. As when late elements arrive the first window is triggered again and would emit a new (accumulated) event, that would be counted twice (in time accumulation and late accumulation) in the second window.I could implement my own (discarding strategy) like in Apache Beam, but the out stream should contain accumulated events that are stored in cassandra. The second solution just gave an compiler error, thus I think is not possible right now.
best Stephan
On 21 Nov 2016, at 17:56, Aljoscha Krettek <<a href="x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=1" target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
Hi, why did you settle for the last solution?
Cheers, Aljoscha
On Thu, 17 Nov 2016 at 15:57 kaelumania <<a href="x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=2" target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
Hi Fabian,
your proposed solution for: - Multiple window aggregations
You can construct a data flow of cascading window operators and fork off (to emit or further processing) the result after each window. Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
\-> out_1 \-> out_2 \-> out_3
does not work, am I missing something?
First I tried the following DataStream<Reading> values = input.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()); // force lateness
DataStream<ReadingAggregate> aggregatesPerMinute = values .keyBy("id") .timeWindow(Time.minutes(1)) .allowedLateness(Time.minutes(2)) .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute .keyBy("id") .timeWindow(Time.hours(1)) .allowedLateness(Time.hours(2)) .apply(new AggregateReadingAggregates(), new AggregateReadingAggregates()); but due to late data the first fold function would emit 2 rolling aggregates (one with and one without the late element), which results in being counted twice within the second reducer. Therefore i tried WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness .keyBy("id") .timeWindow(Time.minutes(1)) .allowedLateness(Time.hours(2));
WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = readingsPerMinute .timeWindow(Time.hours(1)) .allowedLateness(Time.hours(2));
DataStream<ReadingAggregate> aggregatesPerMinute = readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings()); DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings()); which gives me a compiler error as WindowedStream does not provide a timeWindow method.
Finally I settled with this: KeyedStream<Reading, Tuple> readings = input .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness .keyBy("id");
DataStream<ReadingAggregate> aggregatesPerMinute = readings .timeWindow(Time.minutes(1)) .allowedLateness(Time.hours(2)) .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
DataStream<ReadingAggregate> aggregatesPerHour = readings .timeWindow(Time.hours(1)) .allowedLateness(Time.hours(2)) .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
Feedback is very welcome.
best, Stephan On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing List archive.] < [hidden email]> wrote: Hi Stephan,
I just wrote an answer to your SO question. Best, Fabian If you reply to this email, your message will be added to the discussion below: To unsubscribe from Maintaining watermarks per key, instead of per operator instance, click here. NAML
View this message in context: Re: Maintaining watermarks per key, instead of per operator instanceSent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
If you reply to this email, your message will be added to the discussion below:
To unsubscribe from Maintaining watermarks per key, instead of per operator instance, click here. NAML
View this message in context: Re: Maintaining watermarks per key, instead of per operator instance
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.