I have a simple Aggregation with one caveat. For some reason I have to keep a large amount of state till the window is GCed. The state is within the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to offload the state to the states backend ( ROCKSDB), keeping the between checkpoint state in memory ( seems to be an obvious fix). I am not though allowed to have a RichAggregateFunction in the aggregate method of a windowed stream . That begs 2 questions
1. Why 2. Is there an alternative for stateful window aggregation where we manage the state. ? Thanks Vishal Here is the code ( generics but it works ) SingleOutputStreamOperator<OUT> retVal = input |
It seems that this has to do with session windows tbat are mergeable ? I tried the RixhWindow function and that seems to suggest that one cannot use state ? Any ideas folks... On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <[hidden email]> wrote:
|
Hi Vishal, you are right, it is not possible to use state in an AggregateFunction because windows need to be mergeable. An AggregateFunction knows how to merge its accumulators but merging generic state is not possible. I am not aware of an efficient and easy work around for this. If you want to use the provided session window logic, you can use a WindowFunction that performs all computations when the window is triggered. This means that aggregations do not happen eagerly and all events for a window are collected and held in state. Another approach could be to implement the whole logic (incl. the session windowing) using a ProcessFunction. This would be a major effort though. Best, Fabian 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <[hidden email]>:
|
Hi,
If you use an AggregatingFunction in this way (i.e. for a window) the ACC should in fact be kept in the state backend. Did you configure the job to use RocksDB? How are the memory problems manifesting? Best, Aljoscha
|
I understand that. Let me elaborate. The sequence of events is 1. Round robin dispatch to kafka cluster ( it is not partitioned on the key which we may ultimately do and than I will have more questions on how to key y and still keep order, pbly avoid shuffle :) ) . 2. key by a high cardinality key 3. Sessionize 4. B'coz of the RR on kafka ( and even if partitioned on the key and a subsequent key by ), the sort order is not retained and the ACC has to hold on to the elements in a List . When the Window is finalized we sort the in ACC List and do pagination, We are looking for paths within a session from . a source to a sink event based. I was hoping to use ROCKS DB state as a final merged list and thus off heap and use a Count based Trigger to evaluate the ACC and merge the inter Trigger collection to the master copy rather than keeping all events in the ACC ( I would imagine a very general pattern to use ). Does that make sense ? On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <[hidden email]> wrote:
|
An additional question is that if the source is key partitioned ( kafka ) does a keyBy retain the order of a kafka partirion across a shuffle ? On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <[hidden email]> wrote:
|
Hi, the order or records that are sent from one task to another task is preserved (task refers to the parallel instance of an operator). However, a task that receives records from multiple input tasks, consumes records from its inputs in arbitrary order.So: 1) Records from the same partition might not be processed by the same operator (and hence not in order). 2) Records with the same key are processed by the same operator in the same order in which they were read from the partition. Best, Fabian 2017-12-09 18:09 GMT+01:00 Vishal Santoshi <[hidden email]>:
|
Perfect, f in our use case, the kafka partition key and the keyBy use the same exact field and thus the order will be preserved. On Mon, Dec 11, 2017 at 4:34 AM, Fabian Hueske <[hidden email]> wrote:
|
Hello Fabian, We decided that it does not make sense to create partitioned kakka partitions b'coz of hot spot considerations. So we created a way to keep trimmed state in the Accumulator provided we know the current watermark to keep the trimmed state time correct. In essence the paths we look for in a sequence of events in a session are eagerly materialized and emitted using a periodic CountTrigger followed by truncation of the state. It requires us to know current watermark in the e Accumulator ? We do have the watermark in Trigger's onElement(), onEventTime() and onProcessingTime() through the TriggerContext , but I see no way to pass it on to the Accumulator. A lazy setting of WM on the element, which we thought was a shared instance between invocation of add() on Accumulator and onElement() on the attached Trigger, does not seem to work in a distributed environment. I tried the ProcessWindowFunction too. It was promising as it's process method has the Context and thus the WM, but it too suffers from the same issue when using WindowState ( state keyed to window and key ) in session window throwing java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows. at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$MergingWindowStateStore.getState(WindowOperator.java:720) Vishal On Mon, Dec 11, 2017 at 10:13 AM, Vishal Santoshi <[hidden email]> wrote:
|
In reply to this post by Fabian Hueske-2
Hi Fabian,
We came across this issue while working on RichAggregateFunction. Isnt generic state mergeable, similar to ACC merge? What if I need the Flink classLoader in the Aggregate function? Is there anyway I can do that without RuntimeContext? Thanks, Chirag -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |