Aggregator State in Keyed Windowed Stream

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Aggregator State in Keyed Windowed Stream

Ning Shi
Since the aggregate() function on a keyed window stream does not allow
using rich functions, I can only use an AggregateFunction. Is the
accumulator state of the AggregateFunction backed by RocksDB and
persisted in checkpoints if I use the RocksDB backend. My job looks
like the following,

sourceStream.keyBy(…)
    .timeWindow(Time.seconds(…))
    .trigger(CountTrigger.of(…))
    .aggregate(new MyAggFunc());

Since the stream has high cardinality keys, each window could have
millions of them. If the accumulator state is not backed by RocksDB,
there might be a lot of data stored on the heap.

And why is rich functions not allowed here?

Thanks,


Ning
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator State in Keyed Windowed Stream

vino yang
Hi Ning,

Answer you question: 

And why is rich functions not allowed here?

If you need access to the state API, you can consider using ProcessWindowFunction[1], which allows you to use ProcessWindowFunction.

Thanks, vino.


Ning Shi <[hidden email]> 于2018年9月10日周一 上午11:28写道:
Since the aggregate() function on a keyed window stream does not allow
using rich functions, I can only use an AggregateFunction. Is the
accumulator state of the AggregateFunction backed by RocksDB and
persisted in checkpoints if I use the RocksDB backend. My job looks
like the following,

sourceStream.keyBy(…)
    .timeWindow(Time.seconds(…))
    .trigger(CountTrigger.of(…))
    .aggregate(new MyAggFunc());

Since the stream has high cardinality keys, each window could have
millions of them. If the accumulator state is not backed by RocksDB,
there might be a lot of data stored on the heap.

And why is rich functions not allowed here?

Thanks,


Ning
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator State in Keyed Windowed Stream

vino yang
In addition : 

ProcessWindowFunction extends AbstractRichFunction, through getRuntimeContext,you can access keyed state API.

vino yang <[hidden email]> 于2018年9月10日周一 下午3:19写道:
Hi Ning,

Answer you question: 

And why is rich functions not allowed here?

If you need access to the state API, you can consider using ProcessWindowFunction[1], which allows you to use ProcessWindowFunction.

Thanks, vino.


Ning Shi <[hidden email]> 于2018年9月10日周一 上午11:28写道:
Since the aggregate() function on a keyed window stream does not allow
using rich functions, I can only use an AggregateFunction. Is the
accumulator state of the AggregateFunction backed by RocksDB and
persisted in checkpoints if I use the RocksDB backend. My job looks
like the following,

sourceStream.keyBy(…)
    .timeWindow(Time.seconds(…))
    .trigger(CountTrigger.of(…))
    .aggregate(new MyAggFunc());

Since the stream has high cardinality keys, each window could have
millions of them. If the accumulator state is not backed by RocksDB,
there might be a lot of data stored on the heap.

And why is rich functions not allowed here?

Thanks,


Ning
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator State in Keyed Windowed Stream

Ning Shi
In reply to this post by vino yang
Hi Vino,

> If you need access to the state API, you can consider using ProcessWindowFunction[1], which allows you to use ProcessWindowFunction.

I was hoping that I could use the aggregate function to do incremental aggregation. My understanding is that ProcessWindowFunction either has to loop through all records or be combined with an aggregate function to do incremental aggregation.

Back to my first question, is the accumulator state backed by RocksDB state backend? If so, I don’t need to use rich function for the aggregate function.

Thanks,

Ning
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator State in Keyed Windowed Stream

Andrey Zagrebin
Hi Ning,

> Back to my first question, is the accumulator state backed by RocksDB state backend? If so, I don’t need to use rich function for the aggregate function.

the answer is yes, it is backed by state backend (should be RocksDB if you configure it),
you can trace it through these method calls:

sourceStream.keyBy(…)
    .timeWindow(Time.seconds(…))
    .trigger(CountTrigger.of(…))
gives you WindowedStream,
WindowedStream.aggregate(new MyAggFunc()) creates:
new WindowOperator(windowStateDescriptor = new AggregatingStateDescriptor()),
inside WindowOperator:
WindowOperator.open() uses configured backend to create windowState,
WindowOperator.processElement() uses windowState which is AggregatingState.

Cheers,
Andrey

> On 10 Sep 2018, at 13:39, Ning Shi <[hidden email]> wrote:
>
> Hi Vino,
>
>> If you need access to the state API, you can consider using ProcessWindowFunction[1], which allows you to use ProcessWindowFunction.
>
> I was hoping that I could use the aggregate function to do incremental aggregation. My understanding is that ProcessWindowFunction either has to loop through all records or be combined with an aggregate function to do incremental aggregation.
>
> Back to my first question, is the accumulator state backed by RocksDB state backend? If so, I don’t need to use rich function for the aggregate function.
>
> Thanks,
>
> Ning

Reply | Threaded
Open this post in threaded view
|

Re: Aggregator State in Keyed Windowed Stream

Ning Shi
In reply to this post by Ning Shi
> Back to my first question, is the accumulator state backed by RocksDB state backend? If so, I don’t need to use rich function for the aggregate function.

I did some testing and code reading. To answer my own question, the
accumulator state seems to be managed by RocksDB if I use it as the
state backend. The aggregate function is wrapped by
org.apache.flink.contrib.streaming.state.RocksDBAggregatingState. As
long as the accumulator is serializable, it will be stored in RocksDB.
So I don’t seem to need to use rich function here. Please correct me
if I’m wrong.

Ning
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator State in Keyed Windowed Stream

Ning Shi
In reply to this post by Andrey Zagrebin
Hi Andrey,

> the answer is yes, it is backed by state backend (should be RocksDB if you configure it),
> you can trace it through these method calls:
>
> sourceStream.keyBy(…)
> .timeWindow(Time.seconds(…))
> .trigger(CountTrigger.of(…))
> gives you WindowedStream,
> WindowedStream.aggregate(new MyAggFunc()) creates:
> new WindowOperator(windowStateDescriptor = new AggregatingStateDescriptor()),
> inside WindowOperator:
> WindowOperator.open() uses configured backend to create windowState,
> WindowOperator.processElement() uses windowState which is AggregatingState.

Thank you for the answer. This is great! It also confirms my
observation that the heap wasn’t growing indefinitely when I do this.

Ning