Hi,
I want to aggregate hits by Country, State, City. I would these as tags in my sample data. How would I do aggregation at different levels ? Input data would be single record Should I do flatMap transformation first and create 3 records from 1 input record, or is there a better way to do it ? thank you, basanth |
For example - this is a sample model from one of the Apache Apex presentation. I would want to aggregate for different combinations, and different time buckets. What is the best way to do this in Flink ? {"keys":[{"name":"campaignId","type":"integer"}, {"name":"adId","type":"integer"}, {"name":"creativeId","type":"integer"}, {"name":"publisherId","type":"integer"}, {"name":"adOrderId","type":"integer"}], "timeBuckets":["1h","1d"], "values": [{"name":"impressions","type":"integer","aggregators":["SUM"]} , {"name":"clicks","type":"integer","aggregators":["SUM"]}, {"name":"revenue","type":"integer"}], "dimensions": [{"combination":["campaignId","adId"]}, {"combination":["creativeId","campaignId"]}, {"combination":["campaignId"]}, {"combination":["publisherId","adOrderId","campaignId"], "additionalValues":["revenue:SUM"]}] } thank you, B On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <[hidden email]> wrote:
|
Hi Basanth,
Let's assume you have records of the form Record = {timestamp, country, state, city, value} Then you'd like to create aggregates, e.g. the average, for the following combinations? 1) avg per country 2) avg per state and country 3) avg per city and state and country * You could create three streams and aggregate each individually: DataStream<Record> ds = //... DataStream<Record> ds1 = ds.keyBy("country"); DataStream<Record> ds2 = ds.keyBy("country","state"); DataStream<Record> ds3 = ds.keyBy("country","state","city"); // + your aggregation per stream ds1, ds2, ds3 You probably want to do different things for each of the resulting aggregations anyway, so having separate streams is probably right for you. * Alternatively, you could go with ds1 only and create the aggregates of the per-state (2) and per-city (3) ones in a stateful aggregation function yourself, e.g. in a MapState [1]. At the end of your aggregation window, you could then emit those with different keys to be able to distinguish between them. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/ state.html On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote: > For example - this is a sample model from one of the Apache Apex > presentation. > > I would want to aggregate for different combinations, and different time > buckets. What is the best way to do this in Flink ? > > {"keys":[{"name":"campaignId","type":"integer"}, > {"name":"adId","type":"integer"}, > {"name":"creativeId","type":"integer"}, > {"name":"publisherId","type":"integer"}, > {"name":"adOrderId","type":"integer"}], > "timeBuckets":["1h","1d"], > "values": > [{"name":"impressions","type":"integer","aggregators":["SUM"]} > , > {"name":"clicks","type":"integer","aggregators":["SUM"]}, > {"name":"revenue","type":"integer"}], > "dimensions": > [{"combination":["campaignId","adId"]}, > {"combination":["creativeId","campaignId"]}, > {"combination":["campaignId"]}, > {"combination":["publisherId","adOrderId","campaignId"], > "additionalValues":["revenue:SUM"]}] > } > > > thank you, > B > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <[hidden email]> > > wrote: > > Hi, > > I want to aggregate hits by Country, State, City. I would these as tags in > > my sample data. > > > > How would I do aggregation at different levels ? Input data would be > > single record > > > > Should I do flatMap transformation first and create 3 records from 1 input > > record, or is there a better way to do it ? > > > > thank you, > > basanth signature.asc (201 bytes) Download Attachment |
[back to the ml...]
also including your other mail's additional content... > I have been able to do this by the following and repeating this for every > key + window combination. So in the above case there would be 8 blocks like > below. (4 combinations and 2 window period for each combination) > > modelDataStream.keyBy("campaiginId","addId") > .timeWindow(Time.minutes(1)) > .trigger(CountTrigger.of(2)) > .reduce(..) As mentioned in my last email, I only see one way for reducing duplication (for the key combinations) but this involves more handling from your side and I'd probably not recommend this. Regarding the different windows, I do not see something you may do otherwise here. Maybe Aljoscha (cc'd) has an idea of how to do this better Nico On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote: > Hi Nico, > Thank you . This is pretty much what I am doing , was wondering if there is > a better way. > > If there are 10 dimensions on which I want to aggregate with 2 windows - > this would become about 20 different combinations > > Thank you > Basanth > > On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber <[hidden email]> wrote: > > Hi Basanth, > > Let's assume you have records of the form > > Record = {timestamp, country, state, city, value} > > Then you'd like to create aggregates, e.g. the average, for the following > > combinations? > > 1) avg per country > > 2) avg per state and country > > 3) avg per city and state and country > > > > * You could create three streams and aggregate each individually: > > DataStream<Record> ds = //... > > DataStream<Record> ds1 = ds.keyBy("country"); > > DataStream<Record> ds2 = ds.keyBy("country","state"); > > DataStream<Record> ds3 = ds.keyBy("country","state","city"); > > // + your aggregation per stream ds1, ds2, ds3 > > > > You probably want to do different things for each of the resulting > > aggregations anyway, so having separate streams is probably right for you. > > > > * Alternatively, you could go with ds1 only and create the aggregates of > > the > > per-state (2) and per-city (3) ones in a stateful aggregation function > > yourself, e.g. in a MapState [1]. At the end of your aggregation window, > > you > > could then emit those with different keys to be able to distinguish > > between > > them. > > > > > > Nico > > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/ > > state.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/st > > ate.html>> > > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote: > > > For example - this is a sample model from one of the Apache Apex > > > presentation. > > > > > > I would want to aggregate for different combinations, and different time > > > buckets. What is the best way to do this in Flink ? > > > > > > {"keys":[{"name":"campaignId","type":"integer"}, > > > > > > {"name":"adId","type":"integer"}, > > > {"name":"creativeId","type":"integer"}, > > > {"name":"publisherId","type":"integer"}, > > > {"name":"adOrderId","type":"integer"}], > > > "timeBuckets":["1h","1d"], > > > > > > "values": > > > [{"name":"impressions","type":"integer","aggregators":["SUM"]} > > > , > > > > > > {"name":"clicks","type":"integer","aggregators":["SUM"]}, > > > {"name":"revenue","type":"integer"}], > > > "dimensions": > > > [{"combination":["campaignId","adId"]}, > > > {"combination":["creativeId","campaignId"]}, > > > {"combination":["campaignId"]}, > > > {"combination":["publisherId","adOrderId","campaignId"], > > > > > > "additionalValues":["revenue:SUM"]}] > > > } > > > > > > > > > thank you, > > > B > > > > > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <[hidden email]> > > > > > > wrote: > > > > Hi, > > > > I want to aggregate hits by Country, State, City. I would these as > > > > tags in > > > > > > my sample data. > > > > > > > > How would I do aggregation at different levels ? Input data would be > > > > single record > > > > > > > > Should I do flatMap transformation first and create 3 records from 1 > > > > input > > > > > > record, or is there a better way to do it ? > > > > > > > > thank you, > > > > basanth signature.asc (201 bytes) Download Attachment |
Thanks Nico. As there are 2 ways to achieve this which is better ? 1st option -> dataStream.flatMap( ... ) -> this takes in out and provides me N number of outputs, depending on my key combination . On each of the output the same windowing logic is applied or the one you suggested 2nd option -> use keyBy to create N number of streams With the fist option I would use an external config, and it allows me to change the number of combinations dynamically at runtime. Would it be possible with 2nd option as well ? Can I modify or add data stream at runtime without restarting ? On Wed, Aug 16, 2017 at 4:37 AM, Nico Kruber <[hidden email]> wrote: [back to the ml...] |
Hi Basant,
no, you cannot add data streams or re-wire your program during runtime. As for any other program changes, you'd have to take a savepoint (to keep operator state and exactly-once semantics) and restart the new program code from there. For a few combinations, I'd probably choose the second option for simplicity but for more combinations, option 1 seems better (mapping your key combinations to different tuple-keys, key-by this one and applying window operations afterwards). Option 2 may also require more slots to be available since it has more operators [1] and may not be evenly balanced based on your input data and the work associated with it. Since option 1's window operators aggregate all different tuples, load distribution may be better. Other than that, the communication pattern is similar. To get a better understanding of the performance impacts, you'd have to benchmark with your aggregation and input data though. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/ programming-model.html#parallel-dataflows On Wednesday, 16 August 2017 13:24:42 CEST Basanth Gowda wrote: > Thanks Nico. > > As there are 2 ways to achieve this which is better ? > > 1st option -> dataStream.flatMap( ... ) -> this takes in out and provides > me N number of outputs, depending on my key combination . On each of the > output the same windowing logic is applied > > or the one you suggested > > 2nd option -> use keyBy to create N number of streams > > With the fist option I would use an external config, and it allows me to > change the number of combinations dynamically at runtime. Would it be > possible with 2nd option as well ? Can I modify or add data stream at > runtime without restarting ? > > On Wed, Aug 16, 2017 at 4:37 AM, Nico Kruber <[hidden email]> wrote: > > [back to the ml...] > > > > also including your other mail's additional content... > > > > > I have been able to do this by the following and repeating this for > > > every > > > key + window combination. So in the above case there would be 8 blocks > > > > like > > > > > below. (4 combinations and 2 window period for each combination) > > > > > > modelDataStream.keyBy("campaiginId","addId") > > > > > > .timeWindow(Time.minutes(1)) > > > .trigger(CountTrigger.of(2)) > > > .reduce(..) > > > > As mentioned in my last email, I only see one way for reducing duplication > > (for the key combinations) but this involves more handling from your side > > and > > I'd probably not recommend this. Regarding the different windows, I do not > > see > > something you may do otherwise here. > > > > Maybe Aljoscha (cc'd) has an idea of how to do this better > > > > > > Nico > > > > On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote: > > > Hi Nico, > > > Thank you . This is pretty much what I am doing , was wondering if there > > > > is > > > > > a better way. > > > > > > If there are 10 dimensions on which I want to aggregate with 2 windows - > > > this would become about 20 different combinations > > > > > > Thank you > > > Basanth > > > > > > On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber <[hidden email]> > > > > wrote: > > > > Hi Basanth, > > > > Let's assume you have records of the form > > > > Record = {timestamp, country, state, city, value} > > > > Then you'd like to create aggregates, e.g. the average, for the > > > > following > > > > > > combinations? > > > > 1) avg per country > > > > 2) avg per state and country > > > > 3) avg per city and state and country > > > > > > > > * You could create three streams and aggregate each individually: > > > > DataStream<Record> ds = //... > > > > DataStream<Record> ds1 = ds.keyBy("country"); > > > > DataStream<Record> ds2 = ds.keyBy("country","state"); > > > > DataStream<Record> ds3 = ds.keyBy("country","state","city"); > > > > // + your aggregation per stream ds1, ds2, ds3 > > > > > > > > You probably want to do different things for each of the resulting > > > > aggregations anyway, so having separate streams is probably right for > > > > you. > > > > > > * Alternatively, you could go with ds1 only and create the aggregates > > > > of > > > > > > the > > > > per-state (2) and per-city (3) ones in a stateful aggregation function > > > > yourself, e.g. in a MapState [1]. At the end of your aggregation > > > > window, > > > > > > you > > > > could then emit those with different keys to be able to distinguish > > > > between > > > > them. > > > > > > > > > > > > Nico > > > > > > > > [1] > > > > https://ci.apache.org/projects/flink/flink-docs-> > > > release-1.3/dev/stream/ > > > > > > state.html > > > > <https://ci.apache.org/projects/flink/flink-docs-> > > > release-1.3/dev/stream/st > > > > > > ate.html>> > > > > > > > > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote: > > > > > For example - this is a sample model from one of the Apache Apex > > > > > presentation. > > > > > > > > > > I would want to aggregate for different combinations, and different > > > > time > > > > > > > buckets. What is the best way to do this in Flink ? > > > > > > > > > > {"keys":[{"name":"campaignId","type":"integer"}, > > > > > > > > > > {"name":"adId","type":"integer"}, > > > > > {"name":"creativeId","type":"integer"}, > > > > > {"name":"publisherId","type":"integer"}, > > > > > {"name":"adOrderId","type":"integer"}], > > > > > "timeBuckets":["1h","1d"], > > > > > > > > > > "values": > > > > > [{"name":"impressions","type":"integer","aggregators":["SUM"]} > > > > > , > > > > > > > > > > {"name":"clicks","type":"integer","aggregators":["SUM"]}, > > > > > {"name":"revenue","type":"integer"}], > > > > > "dimensions": > > > > > [{"combination":["campaignId","adId"]}, > > > > > {"combination":["creativeId","campaignId"]}, > > > > > {"combination":["campaignId"]}, > > > > > {"combination":["publisherId","adOrderId","campaignId"], > > > > > > > > > > "additionalValues":["revenue:SUM"]}] > > > > > } > > > > > > > > > > > > > > > thank you, > > > > > B > > > > > > > > > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda < > > > > [hidden email]> > > > > > > > wrote: > > > > > > Hi, > > > > > > I want to aggregate hits by Country, State, City. I would these as > > > > > > > > tags in > > > > > > > > > > my sample data. > > > > > > > > > > > > How would I do aggregation at different levels ? Input data would > > > > be > > > > > > > > single record > > > > > > > > > > > > Should I do flatMap transformation first and create 3 records from > > > > 1 > > > > > > input > > > > > > > > > > record, or is there a better way to do it ? > > > > > > > > > > > > thank you, > > > > > > basanth signature.asc (201 bytes) Download Attachment |
Free forum by Nabble | Edit this page |