Flink multiple windows

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink multiple windows

Antonio Saldivar Lezama
Hello 

Has anyone work this way? I am asking because I have to get the aggregation ( Sum and Count) for multiple windows size  (10 mins, 20 mins, 30 mins) please let me know if this works properly or is there other good solution.


DataStream<String> data = ...
// append a Long 1 to each record to count it.
DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne); 

DataStream<Tuple2<String, Long>> 1minCnts = withOnes
  // key by String field
  .keyBy(0) 
  // define time window
  .timeWindow(Time.of(1, MINUTES))
  // sum ones of the Long field
  // in practice you want to use an incrementally aggregating ReduceFunction and 
  // a WindowFunction to extract the start/end timestamp of the window
  .sum(1);

// emit 1-min counts to wherever you need it
1minCnts.addSink(new YourSink());

// compute 5-min counts based on 1-min counts
DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts
  // key by String field
  .keyBy(0)
  // define time window of 5 minutes
  .timeWindow(Time.of(5, MINUTES))
  // sum the 1-minute counts in the Long field
  .sum(1);

// emit 5-min counts to wherever you need it
5minCnts.addSink(new YourSink());

// continue with 1 day window and 1 week window
Thank you Regards


Reply | Threaded
Open this post in threaded view
|

Re: Flink multiple windows

Fabian Hueske-2
Hi Antonio,

Cascading window aggregations as done in your example is a good idea and is preferable if the aggregation function is combinable, which is true for sum (count can be done as sum of 1s).

Best, Fabian

2018-06-09 4:00 GMT+02:00 antonio saldivar <[hidden email]>:
Hello 

Has anyone work this way? I am asking because I have to get the aggregation ( Sum and Count) for multiple windows size  (10 mins, 20 mins, 30 mins) please let me know if this works properly or is there other good solution.


DataStream<String> data = ...
// append a Long 1 to each record to count it.
DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne); 

DataStream<Tuple2<String, Long>> 1minCnts = withOnes
  // key by String field
  .keyBy(0) 
  // define time window
  .timeWindow(Time.of(1, MINUTES))
  // sum ones of the Long field
  // in practice you want to use an incrementally aggregating ReduceFunction and 
  // a WindowFunction to extract the start/end timestamp of the window
  .sum(1);

// emit 1-min counts to wherever you need it
1minCnts.addSink(new YourSink());

// compute 5-min counts based on 1-min counts
DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts
  // key by String field
  .keyBy(0)
  // define time window of 5 minutes
  .timeWindow(Time.of(5, MINUTES))
  // sum the 1-minute counts in the Long field
  .sum(1);

// emit 5-min counts to wherever you need it
5minCnts.addSink(new YourSink());

// continue with 1 day window and 1 week window
Thank you Regards



Reply | Threaded
Open this post in threaded view
|

Re: Flink multiple windows

Antonio Saldivar Lezama
Thank you very much Fabian I found that solution in the link below and this is the bes fit for my use case

I am still testing how to Count (example. numTransactions >= 3)   then I sum I am also using  Tumbling windows

Best Regards

2018-06-10 6:04 GMT-04:00 Fabian Hueske <[hidden email]>:
Hi Antonio,

Cascading window aggregations as done in your example is a good idea and is preferable if the aggregation function is combinable, which is true for sum (count can be done as sum of 1s).

Best, Fabian

2018-06-09 4:00 GMT+02:00 antonio saldivar <[hidden email]>:
Hello 

Has anyone work this way? I am asking because I have to get the aggregation ( Sum and Count) for multiple windows size  (10 mins, 20 mins, 30 mins) please let me know if this works properly or is there other good solution.


DataStream<String> data = ...
// append a Long 1 to each record to count it.
DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne); 

DataStream<Tuple2<String, Long>> 1minCnts = withOnes
  // key by String field
  .keyBy(0) 
  // define time window
  .timeWindow(Time.of(1, MINUTES))
  // sum ones of the Long field
  // in practice you want to use an incrementally aggregating ReduceFunction and 
  // a WindowFunction to extract the start/end timestamp of the window
  .sum(1);

// emit 1-min counts to wherever you need it
1minCnts.addSink(new YourSink());

// compute 5-min counts based on 1-min counts
DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts
  // key by String field
  .keyBy(0)
  // define time window of 5 minutes
  .timeWindow(Time.of(5, MINUTES))
  // sum the 1-minute counts in the Long field
  .sum(1);

// emit 5-min counts to wherever you need it
5minCnts.addSink(new YourSink());

// continue with 1 day window and 1 week window
Thank you Regards