Hi all,
I use window aggregation to create a stream of aggregated data per user, per some interval. Additionally, I use same windows to aggregate system-wide data per the same interval. I.e.: Per user stream: events keyed by user ID -> tumbling window -> aggregation System wide stream: events -> tumbling window (windowAll) -> aggregation I need to produce a value per user, per interval, that depends on the aggregated data from that user and the system wide data aggregated for the corresponding interval. I couldn't find a way to acheive this with Flink's windows. I think can I get it to work with broadcast, connect and CoProcessFunction - is that the way to go? How would I handle late events that way? Thanks! - Nathan -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
So you are trying to use the same window definition, but you want to aggregate the data in two different ways: 1. keyBy(userId) 2. Global aggregation Do you want to use exactly the same aggregation functions? If not, you can just process the events twice: DataStream<…> events = …; DataStream<….> keyedEvents = events .keyBy(…) .window(…) .process(f) // instead of process this can be whatever you want, aggregate/apply/reduce/... DataStream<….> nonKeyedEvents = events .windowAll(…) .process(g) From here you can process keyedEvents and nonKeyedEvents as you prefer. If yes, if both global and non global aggregation are using similar/the same aggregation function, you could try to use `keyedEvents` as pre-aggregated input for `.windowAll(…)`. DataStream<….> keyedEvents = events.keyBy(…).window(…).process(f) keyedEvents.print() // or further process keyedEvents DataStream<….> nonKeyedEvents = keyedEvents.windowAll(…).process(f') But this assumes that the output of your `process(f)` can be re-processed. This second approach can minimise amount of work to be done by the global aggregation. In the first approach, all of the records will have to be processed by a single operator (global aggregation), which can be a performance bottleneck. Piotrek > On 24 Nov 2019, at 14:20, natasky <[hidden email]> wrote: > > Hi all, > > I use window aggregation to create a stream of aggregated data per user, per > some interval. > > Additionally, I use same windows to aggregate system-wide data per the same > interval. > > I.e.: > > Per user stream: events keyed by user ID -> tumbling window -> aggregation > > System wide stream: events -> tumbling window (windowAll) -> aggregation > > I need to produce a value per user, per interval, that depends on the > aggregated > data from that user and the system wide data aggregated for the > corresponding > interval. > > I couldn't find a way to acheive this with Flink's windows. I think can I > get > it to work with broadcast, connect and CoProcessFunction - is that the way > to > go? How would I handle late events that way? > > Thanks! > - Nathan > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |