Aggregation by key hierarchy

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Aggregation by key hierarchy

Basanth Gowda
Hi,
I want to aggregate hits by Country, State, City. I would these as tags in my sample data.

How would I do aggregation at different levels ? Input data would be single record

Should I do flatMap transformation first and create 3 records from 1 input record, or is there a better way to do it ?

thank you,
basanth 
Reply | Threaded
Open this post in threaded view
|

Re: Aggregation by key hierarchy

Basanth Gowda
For example - this is a sample model from one of the Apache Apex presentation.

I would want to aggregate for different combinations, and different time buckets. What is the best way to do this in Flink ?

{"keys":[{"name":"campaignId","type":"integer"},
 {"name":"adId","type":"integer"},
 {"name":"creativeId","type":"integer"},
 {"name":"publisherId","type":"integer"},
 {"name":"adOrderId","type":"integer"}],
 "timeBuckets":["1h","1d"],
 "values":
[{"name":"impressions","type":"integer","aggregators":["SUM"]}
,
 {"name":"clicks","type":"integer","aggregators":["SUM"]},
 {"name":"revenue","type":"integer"}],
 "dimensions":
 [{"combination":["campaignId","adId"]},
 {"combination":["creativeId","campaignId"]},
 {"combination":["campaignId"]},
 {"combination":["publisherId","adOrderId","campaignId"],
"additionalValues":["revenue:SUM"]}]
}


thank you,
B

On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <[hidden email]> wrote:
Hi,
I want to aggregate hits by Country, State, City. I would these as tags in my sample data.

How would I do aggregation at different levels ? Input data would be single record

Should I do flatMap transformation first and create 3 records from 1 input record, or is there a better way to do it ?

thank you,
basanth 

Reply | Threaded
Open this post in threaded view
|

Re: Aggregation by key hierarchy

Nico Kruber
Hi Basanth,
Let's assume you have records of the form
Record = {timestamp, country, state, city, value}
Then you'd like to create aggregates, e.g. the average, for the following
combinations?
1) avg per country
2) avg per state and country
3) avg per city and state and country

* You could create three streams and aggregate each individually:
DataStream<Record> ds = //...
DataStream<Record> ds1 = ds.keyBy("country");
DataStream<Record> ds2 = ds.keyBy("country","state");
DataStream<Record> ds3 = ds.keyBy("country","state","city");
// + your aggregation per stream ds1, ds2, ds3

You probably want to do different things for each of the resulting
aggregations anyway, so having separate streams is probably right for you.

* Alternatively, you could go with ds1 only and create the aggregates of the
per-state (2) and per-city (3) ones in a stateful aggregation function
yourself, e.g. in a MapState [1]. At the end of your aggregation window, you
could then emit those with different keys to be able to distinguish between
them.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html

On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:

> For example - this is a sample model from one of the Apache Apex
> presentation.
>
> I would want to aggregate for different combinations, and different time
> buckets. What is the best way to do this in Flink ?
>
> {"keys":[{"name":"campaignId","type":"integer"},
>  {"name":"adId","type":"integer"},
>  {"name":"creativeId","type":"integer"},
>  {"name":"publisherId","type":"integer"},
>  {"name":"adOrderId","type":"integer"}],
>  "timeBuckets":["1h","1d"],
>  "values":
> [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> ,
>  {"name":"clicks","type":"integer","aggregators":["SUM"]},
>  {"name":"revenue","type":"integer"}],
>  "dimensions":
>  [{"combination":["campaignId","adId"]},
>  {"combination":["creativeId","campaignId"]},
>  {"combination":["campaignId"]},
>  {"combination":["publisherId","adOrderId","campaignId"],
> "additionalValues":["revenue:SUM"]}]
> }
>
>
> thank you,
> B
>
> On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <[hidden email]>
>
> wrote:
> > Hi,
> > I want to aggregate hits by Country, State, City. I would these as tags in
> > my sample data.
> >
> > How would I do aggregation at different levels ? Input data would be
> > single record
> >
> > Should I do flatMap transformation first and create 3 records from 1 input
> > record, or is there a better way to do it ?
> >
> > thank you,
> > basanth


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Aggregation by key hierarchy

Nico Kruber
[back to the ml...]

also including your other mail's additional content...
> I have been able to do this by the following and repeating this for every
> key + window combination. So in the above case there would be 8 blocks like
> below. (4 combinations and 2 window period for each combination)
>
> modelDataStream.keyBy("campaiginId","addId")
>         .timeWindow(Time.minutes(1))
>         .trigger(CountTrigger.of(2))
>         .reduce(..)

As mentioned in my last email, I only see one way for reducing duplication
(for the key combinations) but this involves more handling from your side and
I'd probably not recommend this. Regarding the different windows, I do not see
something you may do otherwise here.

Maybe Aljoscha (cc'd) has an idea of how to do this better


Nico

On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote:

> Hi Nico,
> Thank you . This is pretty much what I am doing , was wondering if there is
> a better way.
>
> If there are 10 dimensions on which I want to aggregate with 2 windows -
> this would become about 20 different combinations
>
> Thank you
> Basanth
>
> On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber <[hidden email]> wrote:
> > Hi Basanth,
> > Let's assume you have records of the form
> > Record = {timestamp, country, state, city, value}
> > Then you'd like to create aggregates, e.g. the average, for the following
> > combinations?
> > 1) avg per country
> > 2) avg per state and country
> > 3) avg per city and state and country
> >
> > * You could create three streams and aggregate each individually:
> > DataStream<Record> ds = //...
> > DataStream<Record> ds1 = ds.keyBy("country");
> > DataStream<Record> ds2 = ds.keyBy("country","state");
> > DataStream<Record> ds3 = ds.keyBy("country","state","city");
> > // + your aggregation per stream ds1, ds2, ds3
> >
> > You probably want to do different things for each of the resulting
> > aggregations anyway, so having separate streams is probably right for you.
> >
> > * Alternatively, you could go with ds1 only and create the aggregates of
> > the
> > per-state (2) and per-city (3) ones in a stateful aggregation function
> > yourself, e.g. in a MapState [1]. At the end of your aggregation window,
> > you
> > could then emit those with different keys to be able to distinguish
> > between
> > them.
> >
> >
> > Nico
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
> > state.html
> > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/st
> > ate.html>>
> > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> > > For example - this is a sample model from one of the Apache Apex
> > > presentation.
> > >
> > > I would want to aggregate for different combinations, and different time
> > > buckets. What is the best way to do this in Flink ?
> > >
> > > {"keys":[{"name":"campaignId","type":"integer"},
> > >
> > >  {"name":"adId","type":"integer"},
> > >  {"name":"creativeId","type":"integer"},
> > >  {"name":"publisherId","type":"integer"},
> > >  {"name":"adOrderId","type":"integer"}],
> > >  "timeBuckets":["1h","1d"],
> > >
> > >  "values":
> > > [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> > > ,
> > >
> > >  {"name":"clicks","type":"integer","aggregators":["SUM"]},
> > >  {"name":"revenue","type":"integer"}],
> > >  "dimensions":
> > >  [{"combination":["campaignId","adId"]},
> > >  {"combination":["creativeId","campaignId"]},
> > >  {"combination":["campaignId"]},
> > >  {"combination":["publisherId","adOrderId","campaignId"],
> > >
> > > "additionalValues":["revenue:SUM"]}]
> > > }
> > >
> > >
> > > thank you,
> > > B
> > >
> > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <[hidden email]>
> > >
> > > wrote:
> > > > Hi,
> > > > I want to aggregate hits by Country, State, City. I would these as
> >
> > tags in
> >
> > > > my sample data.
> > > >
> > > > How would I do aggregation at different levels ? Input data would be
> > > > single record
> > > >
> > > > Should I do flatMap transformation first and create 3 records from 1
> >
> > input
> >
> > > > record, or is there a better way to do it ?
> > > >
> > > > thank you,
> > > > basanth


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Aggregation by key hierarchy

Basanth Gowda
Thanks Nico.

As there are 2 ways to achieve this which is better ?

1st option -> dataStream.flatMap( ... ) -> this takes in out and provides me N number of outputs, depending on my key combination . On each of the output the same windowing logic is applied

or the one you suggested

2nd option -> use keyBy to create N number of streams 

With the fist option I would use an external config, and it allows me to change the number of combinations dynamically at runtime. Would it be possible with 2nd option as well ? Can I modify or add data stream at runtime without restarting  ?

On Wed, Aug 16, 2017 at 4:37 AM, Nico Kruber <[hidden email]> wrote:
[back to the ml...]

also including your other mail's additional content...
> I have been able to do this by the following and repeating this for every
> key + window combination. So in the above case there would be 8 blocks like
> below. (4 combinations and 2 window period for each combination)
>
> modelDataStream.keyBy("campaiginId","addId")
>         .timeWindow(Time.minutes(1))
>         .trigger(CountTrigger.of(2))
>         .reduce(..)

As mentioned in my last email, I only see one way for reducing duplication
(for the key combinations) but this involves more handling from your side and
I'd probably not recommend this. Regarding the different windows, I do not see
something you may do otherwise here.

Maybe Aljoscha (cc'd) has an idea of how to do this better


Nico

On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote:
> Hi Nico,
> Thank you . This is pretty much what I am doing , was wondering if there is
> a better way.
>
> If there are 10 dimensions on which I want to aggregate with 2 windows -
> this would become about 20 different combinations
>
> Thank you
> Basanth
>
> On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber <[hidden email]> wrote:
> > Hi Basanth,
> > Let's assume you have records of the form
> > Record = {timestamp, country, state, city, value}
> > Then you'd like to create aggregates, e.g. the average, for the following
> > combinations?
> > 1) avg per country
> > 2) avg per state and country
> > 3) avg per city and state and country
> >
> > * You could create three streams and aggregate each individually:
> > DataStream<Record> ds = //...
> > DataStream<Record> ds1 = ds.keyBy("country");
> > DataStream<Record> ds2 = ds.keyBy("country","state");
> > DataStream<Record> ds3 = ds.keyBy("country","state","city");
> > // + your aggregation per stream ds1, ds2, ds3
> >
> > You probably want to do different things for each of the resulting
> > aggregations anyway, so having separate streams is probably right for you.
> >
> > * Alternatively, you could go with ds1 only and create the aggregates of
> > the
> > per-state (2) and per-city (3) ones in a stateful aggregation function
> > yourself, e.g. in a MapState [1]. At the end of your aggregation window,
> > you
> > could then emit those with different keys to be able to distinguish
> > between
> > them.
> >
> >
> > Nico
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
> > state.html
> > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/st
> > ate.html>>
> > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> > > For example - this is a sample model from one of the Apache Apex
> > > presentation.
> > >
> > > I would want to aggregate for different combinations, and different time
> > > buckets. What is the best way to do this in Flink ?
> > >
> > > {"keys":[{"name":"campaignId","type":"integer"},
> > >
> > >  {"name":"adId","type":"integer"},
> > >  {"name":"creativeId","type":"integer"},
> > >  {"name":"publisherId","type":"integer"},
> > >  {"name":"adOrderId","type":"integer"}],
> > >  "timeBuckets":["1h","1d"],
> > >
> > >  "values":
> > > [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> > > ,
> > >
> > >  {"name":"clicks","type":"integer","aggregators":["SUM"]},
> > >  {"name":"revenue","type":"integer"}],
> > >  "dimensions":
> > >  [{"combination":["campaignId","adId"]},
> > >  {"combination":["creativeId","campaignId"]},
> > >  {"combination":["campaignId"]},
> > >  {"combination":["publisherId","adOrderId","campaignId"],
> > >
> > > "additionalValues":["revenue:SUM"]}]
> > > }
> > >
> > >
> > > thank you,
> > > B
> > >
> > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <[hidden email]>
> > >
> > > wrote:
> > > > Hi,
> > > > I want to aggregate hits by Country, State, City. I would these as
> >
> > tags in
> >
> > > > my sample data.
> > > >
> > > > How would I do aggregation at different levels ? Input data would be
> > > > single record
> > > >
> > > > Should I do flatMap transformation first and create 3 records from 1
> >
> > input
> >
> > > > record, or is there a better way to do it ?
> > > >
> > > > thank you,
> > > > basanth


Reply | Threaded
Open this post in threaded view
|

Re: Aggregation by key hierarchy

Nico Kruber
Hi Basant,
no, you cannot add data streams or re-wire your program during runtime.
As for any other program changes, you'd have to take a savepoint (to keep
operator state and exactly-once semantics) and restart the new program code
from there.

For a few combinations, I'd probably choose the second option for simplicity
but for more combinations, option 1 seems better (mapping your key
combinations to different tuple-keys, key-by this one and applying window
operations afterwards).

Option 2 may also require more slots to be available since it has more
operators [1] and may not be evenly balanced based on your input data and the
work associated with it. Since option 1's window operators aggregate all
different tuples, load distribution may be better. Other than that, the
communication pattern is similar. To get a better understanding of the
performance impacts, you'd have to benchmark with your aggregation and input
data though.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/
programming-model.html#parallel-dataflows

On Wednesday, 16 August 2017 13:24:42 CEST Basanth Gowda wrote:

> Thanks Nico.
>
> As there are 2 ways to achieve this which is better ?
>
> 1st option -> dataStream.flatMap( ... ) -> this takes in out and provides
> me N number of outputs, depending on my key combination . On each of the
> output the same windowing logic is applied
>
> or the one you suggested
>
> 2nd option -> use keyBy to create N number of streams
>
> With the fist option I would use an external config, and it allows me to
> change the number of combinations dynamically at runtime. Would it be
> possible with 2nd option as well ? Can I modify or add data stream at
> runtime without restarting  ?
>
> On Wed, Aug 16, 2017 at 4:37 AM, Nico Kruber <[hidden email]> wrote:
> > [back to the ml...]
> >
> > also including your other mail's additional content...
> >
> > > I have been able to do this by the following and repeating this for
> > > every
> > > key + window combination. So in the above case there would be 8 blocks
> >
> > like
> >
> > > below. (4 combinations and 2 window period for each combination)
> > >
> > > modelDataStream.keyBy("campaiginId","addId")
> > >
> > >         .timeWindow(Time.minutes(1))
> > >         .trigger(CountTrigger.of(2))
> > >         .reduce(..)
> >
> > As mentioned in my last email, I only see one way for reducing duplication
> > (for the key combinations) but this involves more handling from your side
> > and
> > I'd probably not recommend this. Regarding the different windows, I do not
> > see
> > something you may do otherwise here.
> >
> > Maybe Aljoscha (cc'd) has an idea of how to do this better
> >
> >
> > Nico
> >
> > On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote:
> > > Hi Nico,
> > > Thank you . This is pretty much what I am doing , was wondering if there
> >
> > is
> >
> > > a better way.
> > >
> > > If there are 10 dimensions on which I want to aggregate with 2 windows -
> > > this would become about 20 different combinations
> > >
> > > Thank you
> > > Basanth
> > >
> > > On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber <[hidden email]>
> >
> > wrote:
> > > > Hi Basanth,
> > > > Let's assume you have records of the form
> > > > Record = {timestamp, country, state, city, value}
> > > > Then you'd like to create aggregates, e.g. the average, for the
> >
> > following
> >
> > > > combinations?
> > > > 1) avg per country
> > > > 2) avg per state and country
> > > > 3) avg per city and state and country
> > > >
> > > > * You could create three streams and aggregate each individually:
> > > > DataStream<Record> ds = //...
> > > > DataStream<Record> ds1 = ds.keyBy("country");
> > > > DataStream<Record> ds2 = ds.keyBy("country","state");
> > > > DataStream<Record> ds3 = ds.keyBy("country","state","city");
> > > > // + your aggregation per stream ds1, ds2, ds3
> > > >
> > > > You probably want to do different things for each of the resulting
> > > > aggregations anyway, so having separate streams is probably right for
> >
> > you.
> >
> > > > * Alternatively, you could go with ds1 only and create the aggregates
> >
> > of
> >
> > > > the
> > > > per-state (2) and per-city (3) ones in a stateful aggregation function
> > > > yourself, e.g. in a MapState [1]. At the end of your aggregation
> >
> > window,
> >
> > > > you
> > > > could then emit those with different keys to be able to distinguish
> > > > between
> > > > them.
> > > >
> > > >
> > > > Nico
> > > >
> > > > [1]
> > > > https://ci.apache.org/projects/flink/flink-docs-> >
> > release-1.3/dev/stream/
> >
> > > > state.html
> > > > <https://ci.apache.org/projects/flink/flink-docs-> >
> > release-1.3/dev/stream/st
> >
> > > > ate.html>>
> > > >
> > > > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> > > > > For example - this is a sample model from one of the Apache Apex
> > > > > presentation.
> > > > >
> > > > > I would want to aggregate for different combinations, and different
> >
> > time
> >
> > > > > buckets. What is the best way to do this in Flink ?
> > > > >
> > > > > {"keys":[{"name":"campaignId","type":"integer"},
> > > > >
> > > > >  {"name":"adId","type":"integer"},
> > > > >  {"name":"creativeId","type":"integer"},
> > > > >  {"name":"publisherId","type":"integer"},
> > > > >  {"name":"adOrderId","type":"integer"}],
> > > > >  "timeBuckets":["1h","1d"],
> > > > >
> > > > >  "values":
> > > > > [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> > > > > ,
> > > > >
> > > > >  {"name":"clicks","type":"integer","aggregators":["SUM"]},
> > > > >  {"name":"revenue","type":"integer"}],
> > > > >  "dimensions":
> > > > >  [{"combination":["campaignId","adId"]},
> > > > >  {"combination":["creativeId","campaignId"]},
> > > > >  {"combination":["campaignId"]},
> > > > >  {"combination":["publisherId","adOrderId","campaignId"],
> > > > >
> > > > > "additionalValues":["revenue:SUM"]}]
> > > > > }
> > > > >
> > > > >
> > > > > thank you,
> > > > > B
> > > > >
> > > > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <
> >
> > [hidden email]>
> >
> > > > > wrote:
> > > > > > Hi,
> > > > > > I want to aggregate hits by Country, State, City. I would these as
> > > >
> > > > tags in
> > > >
> > > > > > my sample data.
> > > > > >
> > > > > > How would I do aggregation at different levels ? Input data would
> >
> > be
> >
> > > > > > single record
> > > > > >
> > > > > > Should I do flatMap transformation first and create 3 records from
> >
> > 1
> >
> > > > input
> > > >
> > > > > > record, or is there a better way to do it ?
> > > > > >
> > > > > > thank you,
> > > > > > basanth


signature.asc (201 bytes) Download Attachment