Local combiner on each mapper in Flink

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Local combiner on each mapper in Flink

flint-stone
Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html but couldn't find anything that matches.


Thanks!

Le
Reply | Threaded
Open this post in threaded view
|

Re: Local combiner on each mapper in Flink

Kurt Young
Hi,

The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

Regarding to your question, you can use combineGroup 

Best,
Kurt

On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <[hidden email]> wrote:
Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html but couldn't find anything that matches.


Thanks!

Le

Reply | Threaded
Open this post in threaded view
|

Re: Local combiner on each mapper in Flink

flint-stone
Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).

Thanks again!

Le

On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <[hidden email]> wrote:
Hi,

The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

Regarding to your question, you can use combineGroup 

Best,
Kurt

On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <[hidden email]> wrote:
Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html but couldn't find anything that matches.


Thanks!

Le


Reply | Threaded
Open this post in threaded view
|

Re: Local combiner on each mapper in Flink

Kurt Young
I think you can use WindowedStream.aggreate

Best,
Kurt

On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <[hidden email]> wrote:
Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).

Thanks again!

Le

On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <[hidden email]> wrote:
Hi,

The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

Regarding to your question, you can use combineGroup 

Best,
Kurt

On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <[hidden email]> wrote:
Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html but couldn't find anything that matches.


Thanks!

Le



Reply | Threaded
Open this post in threaded view
|

Re: Local combiner on each mapper in Flink

flint-stone
Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:

WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))

and then for the reducer I can do:
 
windowStream.apply(...) 

and expect the window information is preserved.

If I were to do use aggregate on window stream, I would end up with something like:

DataStream<Tuple2<String, Long>> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
                    @Override
                    public Accumulator createAccumulator() {
                        return null;
                    }

                    @Override
                    public void add(Tuple2<String, Long> stringLong, Accumulator o) {

                    }

                    @Override
                    public Tuple2<String, Long> getResult(Accumulator o) {
                        return null;
                    }

                    @Override
                    public Accumulator merge(Accumulator o, Accumulator acc1) {
                        return null;
                    }
                });

Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?


Thanks! I apologize if that sounds like a very intuitive questions.


Le






On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <[hidden email]> wrote:
I think you can use WindowedStream.aggreate

Best,
Kurt

On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <[hidden email]> wrote:
Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).

Thanks again!

Le

On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <[hidden email]> wrote:
Hi,

The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

Regarding to your question, you can use combineGroup 

Best,
Kurt

On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <[hidden email]> wrote:
Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html but couldn't find anything that matches.


Thanks!

Le




Reply | Threaded
Open this post in threaded view
|

Re: Local combiner on each mapper in Flink

Kurt Young
Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
What kind of data do you expect the following operator will receive?

Best,
Kurt

On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <[hidden email]> wrote:
Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:

WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))

and then for the reducer I can do:
 
windowStream.apply(...) 

and expect the window information is preserved.

If I were to do use aggregate on window stream, I would end up with something like:

DataStream<Tuple2<String, Long>> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
                    @Override
                    public Accumulator createAccumulator() {
                        return null;
                    }

                    @Override
                    public void add(Tuple2<String, Long> stringLong, Accumulator o) {

                    }

                    @Override
                    public Tuple2<String, Long> getResult(Accumulator o) {
                        return null;
                    }

                    @Override
                    public Accumulator merge(Accumulator o, Accumulator acc1) {
                        return null;
                    }
                });

Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?


Thanks! I apologize if that sounds like a very intuitive questions.


Le






On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <[hidden email]> wrote:
I think you can use WindowedStream.aggreate

Best,
Kurt

On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <[hidden email]> wrote:
Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).

Thanks again!

Le

On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <[hidden email]> wrote:
Hi,

The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

Regarding to your question, you can use combineGroup 

Best,
Kurt

On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <[hidden email]> wrote:
Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html but couldn't find anything that matches.


Thanks!

Le





Reply | Threaded
Open this post in threaded view
|

Re: Local combiner on each mapper in Flink

Fabian Hueske-2
Hi,

in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.

The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window operator to preaggregate because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator state. In this solution you need to implement the windowing logic yourself, i.e., group data in window based on their timestamp. Ensure you don't run out of memory (operator state is kept on the heap), etc. So this solution needs quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window aggregation if you don't mind the shuffle. However, you should add an additional artificial key attribute to spread out the computation of the same original key to more grouping key. Note that Flink assigns grouping keys by hash partitioning to workers. This works well for many distinct keys, but might cause issues in case of low key cardinality. Also note that the state size grows and effectiveness reduces with an increasing cardinality of the artificial key.

Hope this helps,
Fabian

2017-10-26 3:32 GMT+02:00 Kurt Young <[hidden email]>:
Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
What kind of data do you expect the following operator will receive?

Best,
Kurt

On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <[hidden email]> wrote:
Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:

WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))

and then for the reducer I can do:
 
windowStream.apply(...) 

and expect the window information is preserved.

If I were to do use aggregate on window stream, I would end up with something like:

DataStream<Tuple2<String, Long>> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
                    @Override
                    public Accumulator createAccumulator() {
                        return null;
                    }

                    @Override
                    public void add(Tuple2<String, Long> stringLong, Accumulator o) {

                    }

                    @Override
                    public Tuple2<String, Long> getResult(Accumulator o) {
                        return null;
                    }

                    @Override
                    public Accumulator merge(Accumulator o, Accumulator acc1) {
                        return null;
                    }
                });

Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?


Thanks! I apologize if that sounds like a very intuitive questions.


Le






On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <[hidden email]> wrote:
I think you can use WindowedStream.aggreate

Best,
Kurt

On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <[hidden email]> wrote:
Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).

Thanks again!

Le

On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <[hidden email]> wrote:
Hi,

The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

Regarding to your question, you can use combineGroup 

Best,
Kurt

On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <[hidden email]> wrote:
Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html but couldn't find anything that matches.


Thanks!

Le






Reply | Threaded
Open this post in threaded view
|

Re: Local combiner on each mapper in Flink

flint-stone
Thanks guys! That makes more sense now. 

So does it mean once I start use a window operator, all operations on my WindowedStream would be global (across all partitions)? In that case, WindowedStream.aggregate (or sum) would apply to all data after shuffling instead of each partition. 

If I understand this correctly, once I want to perform some sort of counting within each partition for different words (such as word count), I should really avoid using keyBy but keep some sort of counting map for each word while also keep track of the current time stamp, inside each mapper.

Le




On Oct 26, 2017, at 3:17 AM, Fabian Hueske <[hidden email]> wrote:

Hi,

in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.

The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window operator to preaggregate because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator state. In this solution you need to implement the windowing logic yourself, i.e., group data in window based on their timestamp. Ensure you don't run out of memory (operator state is kept on the heap), etc. So this solution needs quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window aggregation if you don't mind the shuffle. However, you should add an additional artificial key attribute to spread out the computation of the same original key to more grouping key. Note that Flink assigns grouping keys by hash partitioning to workers. This works well for many distinct keys, but might cause issues in case of low key cardinality. Also note that the state size grows and effectiveness reduces with an increasing cardinality of the artificial key.

Hope this helps,
Fabian

2017-10-26 3:32 GMT+02:00 Kurt Young <[hidden email]>:
Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
What kind of data do you expect the following operator will receive?

Best,
Kurt

On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <[hidden email]> wrote:
Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:

WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))

and then for the reducer I can do:
 
windowStream.apply(...) 

and expect the window information is preserved.

If I were to do use aggregate on window stream, I would end up with something like:

DataStream<Tuple2<String, Long>> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
                    @Override
                    public Accumulator createAccumulator() {
                        return null;
                    }

                    @Override
                    public void add(Tuple2<String, Long> stringLong, Accumulator o) {

                    }

                    @Override
                    public Tuple2<String, Long> getResult(Accumulator o) {
                        return null;
                    }

                    @Override
                    public Accumulator merge(Accumulator o, Accumulator acc1) {
                        return null;
                    }
                });

Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?


Thanks! I apologize if that sounds like a very intuitive questions.


Le






On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <[hidden email]> wrote:
I think you can use WindowedStream.aggreate

Best,
Kurt

On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <[hidden email]> wrote:
Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).

Thanks again!

Le

On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <[hidden email]> wrote:
Hi,

The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

Regarding to your question, you can use combineGroup 

Best,
Kurt

On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <[hidden email]> wrote:
Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html but couldn't find anything that matches.


Thanks!

Le







Reply | Threaded
Open this post in threaded view
|

Re: Local combiner on each mapper in Flink

Kien Truong

Hi,

For batch API, you can use GroupReduceFunction, which give you the same benefit as a MapReduce combiner.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions

Regards,

Kien


On 10/26/2017 7:37 PM, Le Xu wrote:
Thanks guys! That makes more sense now. 

So does it mean once I start use a window operator, all operations on my WindowedStream would be global (across all partitions)? In that case, WindowedStream.aggregate (or sum) would apply to all data after shuffling instead of each partition. 

If I understand this correctly, once I want to perform some sort of counting within each partition for different words (such as word count), I should really avoid using keyBy but keep some sort of counting map for each word while also keep track of the current time stamp, inside each mapper.

Le




On Oct 26, 2017, at 3:17 AM, Fabian Hueske <[hidden email]> wrote:

Hi,

in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.

The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window operator to preaggregate because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator state. In this solution you need to implement the windowing logic yourself, i.e., group data in window based on their timestamp. Ensure you don't run out of memory (operator state is kept on the heap), etc. So this solution needs quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window aggregation if you don't mind the shuffle. However, you should add an additional artificial key attribute to spread out the computation of the same original key to more grouping key. Note that Flink assigns grouping keys by hash partitioning to workers. This works well for many distinct keys, but might cause issues in case of low key cardinality. Also note that the state size grows and effectiveness reduces with an increasing cardinality of the artificial key.

Hope this helps,
Fabian

2017-10-26 3:32 GMT+02:00 Kurt Young <[hidden email]>:
Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
What kind of data do you expect the following operator will receive?

Best,
Kurt

On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <[hidden email]> wrote:
Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:

WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))

and then for the reducer I can do:
 
windowStream.apply(...) 

and expect the window information is preserved.

If I were to do use aggregate on window stream, I would end up with something like:

DataStream<Tuple2<String, Long>> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
                    @Override
                    public Accumulator createAccumulator() {
                        return null;
                    }

                    @Override
                    public void add(Tuple2<String, Long> stringLong, Accumulator o) {

                    }

                    @Override
                    public Tuple2<String, Long> getResult(Accumulator o) {
                        return null;
                    }

                    @Override
                    public Accumulator merge(Accumulator o, Accumulator acc1) {
                        return null;
                    }
                });

Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?


Thanks! I apologize if that sounds like a very intuitive questions.


Le






On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <[hidden email]> wrote:
I think you can use WindowedStream.aggreate

Best,
Kurt

On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <[hidden email]> wrote:
Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).

Thanks again!

Le

On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <[hidden email]> wrote:
Hi,

The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

Regarding to your question, you can use combineGroup 

Best,
Kurt

On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <[hidden email]> wrote:
Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html but couldn't find anything that matches.


Thanks!

Le







Reply | Threaded
Open this post in threaded view
|

Re: Local combiner on each mapper in Flink

flint-stone
Hi Kien:

Is there a similar API for DataStream as well?

Thanks!

Le


On Oct 26, 2017, at 7:58 AM, Kien Truong <[hidden email]> wrote:

Hi,

For batch API, you can use GroupReduceFunction, which give you the same benefit as a MapReduce combiner.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions

Regards,

Kien


On 10/26/2017 7:37 PM, Le Xu wrote:
Thanks guys! That makes more sense now. 

So does it mean once I start use a window operator, all operations on my WindowedStream would be global (across all partitions)? In that case, WindowedStream.aggregate (or sum) would apply to all data after shuffling instead of each partition. 

If I understand this correctly, once I want to perform some sort of counting within each partition for different words (such as word count), I should really avoid using keyBy but keep some sort of counting map for each word while also keep track of the current time stamp, inside each mapper.

Le




On Oct 26, 2017, at 3:17 AM, Fabian Hueske <[hidden email]> wrote:

Hi,

in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.

The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window operator to preaggregate because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator state. In this solution you need to implement the windowing logic yourself, i.e., group data in window based on their timestamp. Ensure you don't run out of memory (operator state is kept on the heap), etc. So this solution needs quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window aggregation if you don't mind the shuffle. However, you should add an additional artificial key attribute to spread out the computation of the same original key to more grouping key. Note that Flink assigns grouping keys by hash partitioning to workers. This works well for many distinct keys, but might cause issues in case of low key cardinality. Also note that the state size grows and effectiveness reduces with an increasing cardinality of the artificial key.

Hope this helps,
Fabian

2017-10-26 3:32 GMT+02:00 Kurt Young <[hidden email]>:
Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
What kind of data do you expect the following operator will receive?

Best,
Kurt

On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <[hidden email]> wrote:
Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:

WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))

and then for the reducer I can do:
 
windowStream.apply(...) 

and expect the window information is preserved.

If I were to do use aggregate on window stream, I would end up with something like:

DataStream<Tuple2<String, Long>> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
                    @Override
                    public Accumulator createAccumulator() {
                        return null;
                    }

                    @Override
                    public void add(Tuple2<String, Long> stringLong, Accumulator o) {

                    }

                    @Override
                    public Tuple2<String, Long> getResult(Accumulator o) {
                        return null;
                    }

                    @Override
                    public Accumulator merge(Accumulator o, Accumulator acc1) {
                        return null;
                    }
                });

Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?


Thanks! I apologize if that sounds like a very intuitive questions.


Le






On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <[hidden email]> wrote:
I think you can use WindowedStream.aggreate

Best,
Kurt

On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <[hidden email]> wrote:
Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).

Thanks again!

Le

On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <[hidden email]> wrote:
Hi,

The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

Regarding to your question, you can use combineGroup 

Best,
Kurt

On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <[hidden email]> wrote:
Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html but couldn't find anything that matches.


Thanks!

Le








Reply | Threaded
Open this post in threaded view
|

Re: Local combiner on each mapper in Flink

Kien Truong

Hi,

For Streaming API, use a ProcessFunction as Fabian's suggestion.

You can pretty much do anything with a ProcessFunction :)


Best regards,

Kien


On 10/26/2017 8:01 PM, Le Xu wrote:
Hi Kien:

Is there a similar API for DataStream as well?

Thanks!

Le


On Oct 26, 2017, at 7:58 AM, Kien Truong <[hidden email]> wrote:

Hi,

For batch API, you can use GroupReduceFunction, which give you the same benefit as a MapReduce combiner.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions

Regards,

Kien


On 10/26/2017 7:37 PM, Le Xu wrote:
Thanks guys! That makes more sense now. 

So does it mean once I start use a window operator, all operations on my WindowedStream would be global (across all partitions)? In that case, WindowedStream.aggregate (or sum) would apply to all data after shuffling instead of each partition. 

If I understand this correctly, once I want to perform some sort of counting within each partition for different words (such as word count), I should really avoid using keyBy but keep some sort of counting map for each word while also keep track of the current time stamp, inside each mapper.

Le




On Oct 26, 2017, at 3:17 AM, Fabian Hueske <[hidden email]> wrote:

Hi,

in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.

The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window operator to preaggregate because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator state. In this solution you need to implement the windowing logic yourself, i.e., group data in window based on their timestamp. Ensure you don't run out of memory (operator state is kept on the heap), etc. So this solution needs quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window aggregation if you don't mind the shuffle. However, you should add an additional artificial key attribute to spread out the computation of the same original key to more grouping key. Note that Flink assigns grouping keys by hash partitioning to workers. This works well for many distinct keys, but might cause issues in case of low key cardinality. Also note that the state size grows and effectiveness reduces with an increasing cardinality of the artificial key.

Hope this helps,
Fabian

2017-10-26 3:32 GMT+02:00 Kurt Young <[hidden email]>:
Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
What kind of data do you expect the following operator will receive?

Best,
Kurt

On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <[hidden email]> wrote:
Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:

WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))

and then for the reducer I can do:
 
windowStream.apply(...) 

and expect the window information is preserved.

If I were to do use aggregate on window stream, I would end up with something like:

DataStream<Tuple2<String, Long>> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
                    @Override
                    public Accumulator createAccumulator() {
                        return null;
                    }

                    @Override
                    public void add(Tuple2<String, Long> stringLong, Accumulator o) {

                    }

                    @Override
                    public Tuple2<String, Long> getResult(Accumulator o) {
                        return null;
                    }

                    @Override
                    public Accumulator merge(Accumulator o, Accumulator acc1) {
                        return null;
                    }
                });

Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?


Thanks! I apologize if that sounds like a very intuitive questions.


Le






On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <[hidden email]> wrote:
I think you can use WindowedStream.aggreate

Best,
Kurt

On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <[hidden email]> wrote:
Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).

Thanks again!

Le

On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <[hidden email]> wrote:
Hi,

The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

Regarding to your question, you can use combineGroup 

Best,
Kurt

On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <[hidden email]> wrote:
Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html but couldn't find anything that matches.


Thanks!

Le








Reply | Threaded
Open this post in threaded view
|

Re: Local combiner on each mapper in Flink

flint-stone
Thanks for the help!  I’ll try out the ProcessFunction then.

Le

On Oct 26, 2017, at 8:03 AM, Kien Truong <[hidden email]> wrote:

Hi,

For Streaming API, use a ProcessFunction as Fabian's suggestion.

You can pretty much do anything with a ProcessFunction :)


Best regards,

Kien


On 10/26/2017 8:01 PM, Le Xu wrote:
Hi Kien:

Is there a similar API for DataStream as well?

Thanks!

Le


On Oct 26, 2017, at 7:58 AM, Kien Truong <[hidden email]> wrote:

Hi,

For batch API, you can use GroupReduceFunction, which give you the same benefit as a MapReduce combiner.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions

Regards,

Kien


On 10/26/2017 7:37 PM, Le Xu wrote:
Thanks guys! That makes more sense now. 

So does it mean once I start use a window operator, all operations on my WindowedStream would be global (across all partitions)? In that case, WindowedStream.aggregate (or sum) would apply to all data after shuffling instead of each partition. 

If I understand this correctly, once I want to perform some sort of counting within each partition for different words (such as word count), I should really avoid using keyBy but keep some sort of counting map for each word while also keep track of the current time stamp, inside each mapper.

Le




On Oct 26, 2017, at 3:17 AM, Fabian Hueske <[hidden email]> wrote:

Hi,

in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.

The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window operator to preaggregate because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator state. In this solution you need to implement the windowing logic yourself, i.e., group data in window based on their timestamp. Ensure you don't run out of memory (operator state is kept on the heap), etc. So this solution needs quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window aggregation if you don't mind the shuffle. However, you should add an additional artificial key attribute to spread out the computation of the same original key to more grouping key. Note that Flink assigns grouping keys by hash partitioning to workers. This works well for many distinct keys, but might cause issues in case of low key cardinality. Also note that the state size grows and effectiveness reduces with an increasing cardinality of the artificial key.

Hope this helps,
Fabian

2017-10-26 3:32 GMT+02:00 Kurt Young <[hidden email]>:
Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
What kind of data do you expect the following operator will receive?

Best,
Kurt

On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <[hidden email]> wrote:
Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:

WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))

and then for the reducer I can do:
 
windowStream.apply(...) 

and expect the window information is preserved.

If I were to do use aggregate on window stream, I would end up with something like:

DataStream<Tuple2<String, Long>> windowStream = dataStream
                .keyBy(0) //id 
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
                    @Override
                    public Accumulator createAccumulator() {
                        return null;
                    }

                    @Override
                    public void add(Tuple2<String, Long> stringLong, Accumulator o) {

                    }

                    @Override
                    public Tuple2<String, Long> getResult(Accumulator o) {
                        return null;
                    }

                    @Override
                    public Accumulator merge(Accumulator o, Accumulator acc1) {
                        return null;
                    }
                });

Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?


Thanks! I apologize if that sounds like a very intuitive questions.


Le






On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <[hidden email]> wrote:
I think you can use WindowedStream.aggreate

Best,
Kurt

On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <[hidden email]> wrote:
Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).

Thanks again!

Le

On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <[hidden email]> wrote:
Hi,

The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

Regarding to your question, you can use combineGroup 

Best,
Kurt

On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <[hidden email]> wrote:
Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html but couldn't find anything that matches.


Thanks!

Le