Since the aggregate() function on a keyed window stream does not allow
using rich functions, I can only use an AggregateFunction. Is the accumulator state of the AggregateFunction backed by RocksDB and persisted in checkpoints if I use the RocksDB backend. My job looks like the following, sourceStream.keyBy(…) .timeWindow(Time.seconds(…)) .trigger(CountTrigger.of(…)) .aggregate(new MyAggFunc()); Since the stream has high cardinality keys, each window could have millions of them. If the accumulator state is not backed by RocksDB, there might be a lot of data stored on the heap. And why is rich functions not allowed here? Thanks, — Ning |
Hi Ning, Answer you question: And why is rich functions not allowed here? If you need access to the state API, you can consider using ProcessWindowFunction[1], which allows you to use ProcessWindowFunction. Thanks, vino. Ning Shi <[hidden email]> 于2018年9月10日周一 上午11:28写道: Since the aggregate() function on a keyed window stream does not allow |
In addition : ProcessWindowFunction extends AbstractRichFunction, through getRuntimeContext,you can access keyed state API. vino yang <[hidden email]> 于2018年9月10日周一 下午3:19写道:
|
In reply to this post by vino yang
Hi Vino,
> If you need access to the state API, you can consider using ProcessWindowFunction[1], which allows you to use ProcessWindowFunction. I was hoping that I could use the aggregate function to do incremental aggregation. My understanding is that ProcessWindowFunction either has to loop through all records or be combined with an aggregate function to do incremental aggregation. Back to my first question, is the accumulator state backed by RocksDB state backend? If so, I don’t need to use rich function for the aggregate function. Thanks, Ning |
Hi Ning,
> Back to my first question, is the accumulator state backed by RocksDB state backend? If so, I don’t need to use rich function for the aggregate function. the answer is yes, it is backed by state backend (should be RocksDB if you configure it), you can trace it through these method calls: sourceStream.keyBy(…) .timeWindow(Time.seconds(…)) .trigger(CountTrigger.of(…)) gives you WindowedStream, WindowedStream.aggregate(new MyAggFunc()) creates: new WindowOperator(windowStateDescriptor = new AggregatingStateDescriptor()), inside WindowOperator: WindowOperator.open() uses configured backend to create windowState, WindowOperator.processElement() uses windowState which is AggregatingState. Cheers, Andrey > On 10 Sep 2018, at 13:39, Ning Shi <[hidden email]> wrote: > > Hi Vino, > >> If you need access to the state API, you can consider using ProcessWindowFunction[1], which allows you to use ProcessWindowFunction. > > I was hoping that I could use the aggregate function to do incremental aggregation. My understanding is that ProcessWindowFunction either has to loop through all records or be combined with an aggregate function to do incremental aggregation. > > Back to my first question, is the accumulator state backed by RocksDB state backend? If so, I don’t need to use rich function for the aggregate function. > > Thanks, > > Ning |
In reply to this post by Ning Shi
> Back to my first question, is the accumulator state backed by RocksDB state backend? If so, I don’t need to use rich function for the aggregate function.
I did some testing and code reading. To answer my own question, the accumulator state seems to be managed by RocksDB if I use it as the state backend. The aggregate function is wrapped by org.apache.flink.contrib.streaming.state.RocksDBAggregatingState. As long as the accumulator is serializable, it will be stored in RocksDB. So I don’t seem to need to use rich function here. Please correct me if I’m wrong. Ning |
In reply to this post by Andrey Zagrebin
Hi Andrey,
> the answer is yes, it is backed by state backend (should be RocksDB if you configure it), > you can trace it through these method calls: > > sourceStream.keyBy(…) > .timeWindow(Time.seconds(…)) > .trigger(CountTrigger.of(…)) > gives you WindowedStream, > WindowedStream.aggregate(new MyAggFunc()) creates: > new WindowOperator(windowStateDescriptor = new AggregatingStateDescriptor()), > inside WindowOperator: > WindowOperator.open() uses configured backend to create windowState, > WindowOperator.processElement() uses windowState which is AggregatingState. Thank you for the answer. This is great! It also confirms my observation that the heap wasn’t growing indefinitely when I do this. Ning |
Free forum by Nabble | Edit this page |