flink streaming - window chaining example

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink streaming - window chaining example

Chen Bekor
hi all!

I'm just starting my way with flink and I have a design question.

I'm trying to aggregate incoming events (source: kafka topic) on a 10min tumbling window in order to calculate the incoming events rate (total per minute).

I would like to take this window and perform an additional window (60 min) in order to calculate percentiles, std deviation and some other statistics on that time window. finally I would like to trigger some business logic in case the calculation hits a certain threshold.

my main challenge is - how to chain the two windows together.

any help is appreciated (please send scala example code - I'm not using java :) for this project)
Reply | Threaded
Open this post in threaded view
|

Re: flink streaming - window chaining example

Balaji Rajagopalan

val stream:DataStream[String] = env
.addSource(new FlinkKafkaConsumer08[String]("topic_name", new SimpleStringSchema, prop))
val event:DataStream[SomeEventObj] = stream.map(MyMapFunction)
val tenMinute:DataStream[AggEvents] = ridesByDeviceIdStream.timeWindowAll(Time.of(10, TimeUnit.MINUTES).trigger
            (ContinuousProcessingTimeTrigger.of(Time.minutes(1))).map(MyMapFunction1)
val oneHour = tenMinute.keyBy(_.mykey).TumblingEventTimeWindows.of(Time.minutes(60))).trigger (MyTriggerFunction)

Above is pseduo code, may have some syntax errors but is should do what you are looking for. There is dependency on the 
tenminute window and one hour window function, so one will execute after the other. 

On Sun, Mar 27, 2016 at 2:20 PM, Chen Bekor <[hidden email]> wrote:
hi all!

I'm just starting my way with flink and I have a design question.

I'm trying to aggregate incoming events (source: kafka topic) on a 10min tumbling window in order to calculate the incoming events rate (total per minute).

I would like to take this window and perform an additional window (60 min) in order to calculate percentiles, std deviation and some other statistics on that time window. finally I would like to trigger some business logic in case the calculation hits a certain threshold.

my main challenge is - how to chain the two windows together.

any help is appreciated (please send scala example code - I'm not using java :) for this project)