Regarding implementation of aggregate function using a ProcessFunction

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Regarding implementation of aggregate function using a ProcessFunction

Gaurav Luthra
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Regarding implementation of aggregate function using a ProcessFunction

vino yang
Hi Gaurav,

Why do you think the RichAggregateFunction cannot access the State API? 
RichAggregateFunction inherits from AbstractRichFunction (it provides a RuntimeContext that allows you to access the state API).

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午1:38写道:
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Regarding implementation of aggregate function using a ProcessFunction

vino yang
Hi Gaurav,

This is very strange, can you share your code and specific exceptions? Under normal circumstances, it should not throw an exception.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午3:27写道:
Hi Vino,

RichAggregateFunction can surely access the state. But the problem is, In aggregate() method we can not use RichAggregateFunction.
If we use then it throws exception.

So, the option is to use AggregateFunction (not Rich) with aggregate() method on windowed stream. Now, In AggregateFunction, we cannot access RuntimeContext. Hence we can not use state.

Thanks & Regards
Gaurav



On Fri, 28 Sep, 2018, 12:40 PM vino yang, <[hidden email]> wrote:
Hi Gaurav,

Why do you think the RichAggregateFunction cannot access the State API? 
RichAggregateFunction inherits from AbstractRichFunction (it provides a RuntimeContext that allows you to access the state API).

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午1:38写道:
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Regarding implementation of aggregate function using a ProcessFunction

Gaurav Luthra
Hi Vino,

Kindly check below flink code.

package org.apache.flink.streaming.api.datastream.WindowedStream

@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");

if (function instanceof RichFunction) {
throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
}

TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);

TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);

return aggregate(function, accumulatorType, resultType);
}


Kindly, check above snapshot of flink;s aggregate() method, that got applied on windowed stream.

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Fri, Sep 28, 2018 at 1:40 PM vino yang <[hidden email]> wrote:
Hi Gaurav,

This is very strange, can you share your code and specific exceptions? Under normal circumstances, it should not throw an exception.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午3:27写道:
Hi Vino,

RichAggregateFunction can surely access the state. But the problem is, In aggregate() method we can not use RichAggregateFunction.
If we use then it throws exception.

So, the option is to use AggregateFunction (not Rich) with aggregate() method on windowed stream. Now, In AggregateFunction, we cannot access RuntimeContext. Hence we can not use state.

Thanks & Regards
Gaurav



On Fri, 28 Sep, 2018, 12:40 PM vino yang, <[hidden email]> wrote:
Hi Gaurav,

Why do you think the RichAggregateFunction cannot access the State API? 
RichAggregateFunction inherits from AbstractRichFunction (it provides a RuntimeContext that allows you to access the state API).

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午1:38写道:
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Regarding implementation of aggregate function using a ProcessFunction

vino yang
Hi Gaurav,

Yes, you are right. It is really not allowed to use RichFunction. I will Ping Timo, he may give you a more professional answer.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午4:27写道:
Hi Vino,

Kindly check below flink code.

package org.apache.flink.streaming.api.datastream.WindowedStream

@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");

if (function instanceof RichFunction) {
throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
}

TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);

TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);

return aggregate(function, accumulatorType, resultType);
}


Kindly, check above snapshot of flink;s aggregate() method, that got applied on windowed stream.

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Fri, Sep 28, 2018 at 1:40 PM vino yang <[hidden email]> wrote:
Hi Gaurav,

This is very strange, can you share your code and specific exceptions? Under normal circumstances, it should not throw an exception.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午3:27写道:
Hi Vino,

RichAggregateFunction can surely access the state. But the problem is, In aggregate() method we can not use RichAggregateFunction.
If we use then it throws exception.

So, the option is to use AggregateFunction (not Rich) with aggregate() method on windowed stream. Now, In AggregateFunction, we cannot access RuntimeContext. Hence we can not use state.

Thanks & Regards
Gaurav



On Fri, 28 Sep, 2018, 12:40 PM vino yang, <[hidden email]> wrote:
Hi Gaurav,

Why do you think the RichAggregateFunction cannot access the State API? 
RichAggregateFunction inherits from AbstractRichFunction (it provides a RuntimeContext that allows you to access the state API).

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午1:38写道:
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Regarding implementation of aggregate function using a ProcessFunction

Chesnay Schepler
Please see: https://issues.apache.org/jira/browse/FLINK-10250

On 28.09.2018 11:27, vino yang wrote:
Hi Gaurav,

Yes, you are right. It is really not allowed to use RichFunction. I will Ping Timo, he may give you a more professional answer.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午4:27写道:
Hi Vino,

Kindly check below flink code.

package org.apache.flink.streaming.api.datastream.WindowedStream

@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");

if (function instanceof RichFunction) {
throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
}

TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);

TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);

return aggregate(function, accumulatorType, resultType);
}


Kindly, check above snapshot of flink;s aggregate() method, that got applied on windowed stream.

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Fri, Sep 28, 2018 at 1:40 PM vino yang <[hidden email]> wrote:
Hi Gaurav,

This is very strange, can you share your code and specific exceptions? Under normal circumstances, it should not throw an exception.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午3:27写道:
Hi Vino,

RichAggregateFunction can surely access the state. But the problem is, In aggregate() method we can not use RichAggregateFunction.
If we use then it throws exception.

So, the option is to use AggregateFunction (not Rich) with aggregate() method on windowed stream. Now, In AggregateFunction, we cannot access RuntimeContext. Hence we can not use state.

Thanks & Regards
Gaurav



On Fri, 28 Sep, 2018, 12:40 PM vino yang, <[hidden email]> wrote:
Hi Gaurav,

Why do you think the RichAggregateFunction cannot access the State API? 
RichAggregateFunction inherits from AbstractRichFunction (it provides a RuntimeContext that allows you to access the state API).

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午1:38写道:
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Regarding implementation of aggregate function using a ProcessFunction

Gaurav Luthra
Hi Chesnay,

I know it is an issue, And won't be fixed because of window merging feature in case of session window.
But I am looking if someone has implemented aggregation function using ProcessFunction and process() method instead of AggregationFunction and aggregate() method.
I hope you got my point.

Thanks & Regards
Gaurav Luthra



On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler <[hidden email]> wrote:
Please see: https://issues.apache.org/jira/browse/FLINK-10250

On 28.09.2018 11:27, vino yang wrote:
Hi Gaurav,

Yes, you are right. It is really not allowed to use RichFunction. I will Ping Timo, he may give you a more professional answer.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午4:27写道:
Hi Vino,

Kindly check below flink code.

package org.apache.flink.streaming.api.datastream.WindowedStream

@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");

if (function instanceof RichFunction) {
throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
}

TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);

TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);

return aggregate(function, accumulatorType, resultType);
}


Kindly, check above snapshot of flink;s aggregate() method, that got applied on windowed stream.

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Fri, Sep 28, 2018 at 1:40 PM vino yang <[hidden email]> wrote:
Hi Gaurav,

This is very strange, can you share your code and specific exceptions? Under normal circumstances, it should not throw an exception.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午3:27写道:
Hi Vino,

RichAggregateFunction can surely access the state. But the problem is, In aggregate() method we can not use RichAggregateFunction.
If we use then it throws exception.

So, the option is to use AggregateFunction (not Rich) with aggregate() method on windowed stream. Now, In AggregateFunction, we cannot access RuntimeContext. Hence we can not use state.

Thanks & Regards
Gaurav



On Fri, 28 Sep, 2018, 12:40 PM vino yang, <[hidden email]> wrote:
Hi Gaurav,

Why do you think the RichAggregateFunction cannot access the State API? 
RichAggregateFunction inherits from AbstractRichFunction (it provides a RuntimeContext that allows you to access the state API).

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午1:38写道:
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Regarding implementation of aggregate function using a ProcessFunction

Ken Krugler
Hi Gaurav,

I’m curious - for your use case, what are the windowing & aggregation requirements?

E.g. is it a 10 second sliding window?

And what’s the aggregation you’re trying to do?

Thanks,

— Ken


On Sep 28, 2018, at 4:00 AM, Gaurav Luthra <[hidden email]> wrote:

Hi Chesnay,

I know it is an issue, And won't be fixed because of window merging feature in case of session window.
But I am looking if someone has implemented aggregation function using ProcessFunction and process() method instead of AggregationFunction and aggregate() method.
I hope you got my point.

Thanks & Regards
Gaurav Luthra



On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler <[hidden email]> wrote:
Please see: https://issues.apache.org/jira/browse/FLINK-10250

On 28.09.2018 11:27, vino yang wrote:
Hi Gaurav,

Yes, you are right. It is really not allowed to use RichFunction. I will Ping Timo, he may give you a more professional answer.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午4:27写道:
Hi Vino,

Kindly check below flink code.

package org.apache.flink.streaming.api.datastream.WindowedStream

@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");

if (function instanceof RichFunction) {
throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
}

TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);

TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);

return aggregate(function, accumulatorType, resultType);
}


Kindly, check above snapshot of flink;s aggregate() method, that got applied on windowed stream.

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Fri, Sep 28, 2018 at 1:40 PM vino yang <[hidden email]> wrote:
Hi Gaurav,

This is very strange, can you share your code and specific exceptions? Under normal circumstances, it should not throw an exception.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午3:27写道:
Hi Vino,

RichAggregateFunction can surely access the state. But the problem is, In aggregate() method we can not use RichAggregateFunction.
If we use then it throws exception.

So, the option is to use AggregateFunction (not Rich) with aggregate() method on windowed stream. Now, In AggregateFunction, we cannot access RuntimeContext. Hence we can not use state.

Thanks & Regards
Gaurav



On Fri, 28 Sep, 2018, 12:40 PM vino yang, <[hidden email]> wrote:
Hi Gaurav,

Why do you think the RichAggregateFunction cannot access the State API? 
RichAggregateFunction inherits from AbstractRichFunction (it provides a RuntimeContext that allows you to access the state API).

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午1:38写道:
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Regarding implementation of aggregate function using a ProcessFunction

Gaurav Luthra
Hi ken,

Mine is very generic use case. Means I am building an aggregation function using flink, which can be configured according to any use case.
Actually, It will not be for a specific use case and every user can enter their business logic and use this aggregator to get result.
And about windowing also, user can configure the type of window and my aggregator will ask about the required properties for that window.

I hope you got some idea.

But for make it generic I need to use processfunction and process() method to implement it. Instead of more specific AggregateFunction and aggregate() method.

So, I am looking for inputs if anyone has tried implementing aggregation using ProcessFunction and process() function. As it very much needed thing with flink.

Thanks and Regards,
Gaurav Luthra
Mob:- +91-9901945206


On Sun, Sep 30, 2018 at 5:12 AM Ken Krugler <[hidden email]> wrote:
Hi Gaurav,

I’m curious - for your use case, what are the windowing & aggregation requirements?

E.g. is it a 10 second sliding window?

And what’s the aggregation you’re trying to do?

Thanks,

— Ken


On Sep 28, 2018, at 4:00 AM, Gaurav Luthra <[hidden email]> wrote:

Hi Chesnay,

I know it is an issue, And won't be fixed because of window merging feature in case of session window.
But I am looking if someone has implemented aggregation function using ProcessFunction and process() method instead of AggregationFunction and aggregate() method.
I hope you got my point.

Thanks & Regards
Gaurav Luthra



On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler <[hidden email]> wrote:
Please see: https://issues.apache.org/jira/browse/FLINK-10250

On 28.09.2018 11:27, vino yang wrote:
Hi Gaurav,

Yes, you are right. It is really not allowed to use RichFunction. I will Ping Timo, he may give you a more professional answer.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午4:27写道:
Hi Vino,

Kindly check below flink code.

package org.apache.flink.streaming.api.datastream.WindowedStream

@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");

if (function instanceof RichFunction) {
throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
}

TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);

TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);

return aggregate(function, accumulatorType, resultType);
}


Kindly, check above snapshot of flink;s aggregate() method, that got applied on windowed stream.

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Fri, Sep 28, 2018 at 1:40 PM vino yang <[hidden email]> wrote:
Hi Gaurav,

This is very strange, can you share your code and specific exceptions? Under normal circumstances, it should not throw an exception.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午3:27写道:
Hi Vino,

RichAggregateFunction can surely access the state. But the problem is, In aggregate() method we can not use RichAggregateFunction.
If we use then it throws exception.

So, the option is to use AggregateFunction (not Rich) with aggregate() method on windowed stream. Now, In AggregateFunction, we cannot access RuntimeContext. Hence we can not use state.

Thanks & Regards
Gaurav



On Fri, 28 Sep, 2018, 12:40 PM vino yang, <[hidden email]> wrote:
Hi Gaurav,

Why do you think the RichAggregateFunction cannot access the State API? 
RichAggregateFunction inherits from AbstractRichFunction (it provides a RuntimeContext that allows you to access the state API).

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午1:38写道:
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Regarding implementation of aggregate function using a ProcessFunction

Fabian Hueske-2
Hi,

There are basically three options:
1) Use an AggregateFunction and store everything that you would put into state into the Accumulator. This can become quite expensive because the Accumulator is de/serialized for every function call if you use RocksDB. The advantage is that you don't have to store all records in state but only the data you need. Simple aggregations like COUNT or SUM are quite cheap.
2) Use Flink's window primitiives and a WindowProcessFunction. In this case, all records of a Window are stored in a ListState. Adding a record to the LIst is cheap, but the state might grow quite large for longer windows. When the window is evaluated, all records are loaded into memory and iterated by the WindowProcessFunction.
3) Implement the windowing logic in a ProcessFunction. This requires a lot of additional logic, depending on what types of windows you want to support.

Flink's SQL / Table API implements the first approach.

Best, Fabian

Am So., 30. Sep. 2018 um 12:48 Uhr schrieb Gaurav Luthra <[hidden email]>:
Hi ken,

Mine is very generic use case. Means I am building an aggregation function using flink, which can be configured according to any use case.
Actually, It will not be for a specific use case and every user can enter their business logic and use this aggregator to get result.
And about windowing also, user can configure the type of window and my aggregator will ask about the required properties for that window.

I hope you got some idea.

But for make it generic I need to use processfunction and process() method to implement it. Instead of more specific AggregateFunction and aggregate() method.

So, I am looking for inputs if anyone has tried implementing aggregation using ProcessFunction and process() function. As it very much needed thing with flink.

Thanks and Regards,
Gaurav Luthra
Mob:- +91-9901945206


On Sun, Sep 30, 2018 at 5:12 AM Ken Krugler <[hidden email]> wrote:
Hi Gaurav,

I’m curious - for your use case, what are the windowing & aggregation requirements?

E.g. is it a 10 second sliding window?

And what’s the aggregation you’re trying to do?

Thanks,

— Ken


On Sep 28, 2018, at 4:00 AM, Gaurav Luthra <[hidden email]> wrote:

Hi Chesnay,

I know it is an issue, And won't be fixed because of window merging feature in case of session window.
But I am looking if someone has implemented aggregation function using ProcessFunction and process() method instead of AggregationFunction and aggregate() method.
I hope you got my point.

Thanks & Regards
Gaurav Luthra



On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler <[hidden email]> wrote:
Please see: https://issues.apache.org/jira/browse/FLINK-10250

On 28.09.2018 11:27, vino yang wrote:
Hi Gaurav,

Yes, you are right. It is really not allowed to use RichFunction. I will Ping Timo, he may give you a more professional answer.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午4:27写道:
Hi Vino,

Kindly check below flink code.

package org.apache.flink.streaming.api.datastream.WindowedStream

@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");

if (function instanceof RichFunction) {
throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
}

TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);

TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);

return aggregate(function, accumulatorType, resultType);
}


Kindly, check above snapshot of flink;s aggregate() method, that got applied on windowed stream.

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Fri, Sep 28, 2018 at 1:40 PM vino yang <[hidden email]> wrote:
Hi Gaurav,

This is very strange, can you share your code and specific exceptions? Under normal circumstances, it should not throw an exception.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午3:27写道:
Hi Vino,

RichAggregateFunction can surely access the state. But the problem is, In aggregate() method we can not use RichAggregateFunction.
If we use then it throws exception.

So, the option is to use AggregateFunction (not Rich) with aggregate() method on windowed stream. Now, In AggregateFunction, we cannot access RuntimeContext. Hence we can not use state.

Thanks & Regards
Gaurav



On Fri, 28 Sep, 2018, 12:40 PM vino yang, <[hidden email]> wrote:
Hi Gaurav,

Why do you think the RichAggregateFunction cannot access the State API? 
RichAggregateFunction inherits from AbstractRichFunction (it provides a RuntimeContext that allows you to access the state API).

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午1:38写道:
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Regarding implementation of aggregate function using a ProcessFunction

Gaurav Luthra
Hi Fabian,

Thanks for explaining in detail. But we know and you also mentioned the issues in 1) and 2). So, I am continuing with point 3).

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Mon, Oct 1, 2018 at 3:11 PM Fabian Hueske <[hidden email]> wrote:
Hi,

There are basically three options:
1) Use an AggregateFunction and store everything that you would put into state into the Accumulator. This can become quite expensive because the Accumulator is de/serialized for every function call if you use RocksDB. The advantage is that you don't have to store all records in state but only the data you need. Simple aggregations like COUNT or SUM are quite cheap.
2) Use Flink's window primitiives and a WindowProcessFunction. In this case, all records of a Window are stored in a ListState. Adding a record to the LIst is cheap, but the state might grow quite large for longer windows. When the window is evaluated, all records are loaded into memory and iterated by the WindowProcessFunction.
3) Implement the windowing logic in a ProcessFunction. This requires a lot of additional logic, depending on what types of windows you want to support.

Flink's SQL / Table API implements the first approach.

Best, Fabian

Am So., 30. Sep. 2018 um 12:48 Uhr schrieb Gaurav Luthra <[hidden email]>:
Hi ken,

Mine is very generic use case. Means I am building an aggregation function using flink, which can be configured according to any use case.
Actually, It will not be for a specific use case and every user can enter their business logic and use this aggregator to get result.
And about windowing also, user can configure the type of window and my aggregator will ask about the required properties for that window.

I hope you got some idea.

But for make it generic I need to use processfunction and process() method to implement it. Instead of more specific AggregateFunction and aggregate() method.

So, I am looking for inputs if anyone has tried implementing aggregation using ProcessFunction and process() function. As it very much needed thing with flink.

Thanks and Regards,
Gaurav Luthra
Mob:- +91-9901945206


On Sun, Sep 30, 2018 at 5:12 AM Ken Krugler <[hidden email]> wrote:
Hi Gaurav,

I’m curious - for your use case, what are the windowing & aggregation requirements?

E.g. is it a 10 second sliding window?

And what’s the aggregation you’re trying to do?

Thanks,

— Ken


On Sep 28, 2018, at 4:00 AM, Gaurav Luthra <[hidden email]> wrote:

Hi Chesnay,

I know it is an issue, And won't be fixed because of window merging feature in case of session window.
But I am looking if someone has implemented aggregation function using ProcessFunction and process() method instead of AggregationFunction and aggregate() method.
I hope you got my point.

Thanks & Regards
Gaurav Luthra



On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler <[hidden email]> wrote:
Please see: https://issues.apache.org/jira/browse/FLINK-10250

On 28.09.2018 11:27, vino yang wrote:
Hi Gaurav,

Yes, you are right. It is really not allowed to use RichFunction. I will Ping Timo, he may give you a more professional answer.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午4:27写道:
Hi Vino,

Kindly check below flink code.

package org.apache.flink.streaming.api.datastream.WindowedStream

@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");

if (function instanceof RichFunction) {
throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
}

TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);

TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);

return aggregate(function, accumulatorType, resultType);
}


Kindly, check above snapshot of flink;s aggregate() method, that got applied on windowed stream.

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Fri, Sep 28, 2018 at 1:40 PM vino yang <[hidden email]> wrote:
Hi Gaurav,

This is very strange, can you share your code and specific exceptions? Under normal circumstances, it should not throw an exception.

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午3:27写道:
Hi Vino,

RichAggregateFunction can surely access the state. But the problem is, In aggregate() method we can not use RichAggregateFunction.
If we use then it throws exception.

So, the option is to use AggregateFunction (not Rich) with aggregate() method on windowed stream. Now, In AggregateFunction, we cannot access RuntimeContext. Hence we can not use state.

Thanks & Regards
Gaurav



On Fri, 28 Sep, 2018, 12:40 PM vino yang, <[hidden email]> wrote:
Hi Gaurav,

Why do you think the RichAggregateFunction cannot access the State API? 
RichAggregateFunction inherits from AbstractRichFunction (it provides a RuntimeContext that allows you to access the state API).

Thanks, vino.

Gaurav Luthra <[hidden email]> 于2018年9月28日周五 下午1:38写道:
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra